Blame view

mspark/SSC.py 1.06 KB
f69baeb6   Chunk   spark streaming ...
1
2
3
4
5
6
7
8
9
10
11
__author__ = 'chunk'

from ..common import *
from . import *
from .dependencies import *
from .SC import *

import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

ece71a0d   Chunk   Streaming! encodi...
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

class StreamSparker(Sparker):
    def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost', port=9999, **kwargs):
        Sparker.__init__(self, host=host, appname=appname)

        self.source = source
        self.port = port
        self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1)

    def start(self):
        self.ssc.start()
        self.ssc.awaitTermination()


    def set_datasource(self, source='localhost', port=9999):
        self.source = source
        self.port = port

    def _word_count(self):
        lines = self.ssc.socketTextStream(self.source, self.port)
        words = lines.flatMap(lambda line: line.split(" "))
        pairs = words.map(lambda word: (word, 1))
        wordCounts = pairs.reduceByKey(lambda x, y: x + y)

        wordCounts.pprint()

        self.start()
f69baeb6   Chunk   spark streaming ...

ece71a0d   Chunk   Streaming! encodi...

018ebf56   Chunk   Spark Streaming T...

f69baeb6   Chunk   spark streaming ...

f69baeb6   Chunk   spark streaming ...

ece71a0d   Chunk   Streaming! encodi...

f69baeb6   Chunk   spark streaming ...

ece71a0d   Chunk   Streaming! encodi...

f69baeb6   Chunk   spark streaming ...

ece71a0d   Chunk   Streaming! encodi...