SSC.py 1.06 KB
__author__ = 'chunk'

from ..common import *
from . import *
from .dependencies import *
from .SC import *

import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext


class StreamSparker(Sparker):
    def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost', port=9999, **kwargs):
        Sparker.__init__(self, host=host, appname=appname)

        self.source = source
        self.port = port
        self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1)

    def start(self):
        self.ssc.start()
        self.ssc.awaitTermination()


    def set_datasource(self, source='localhost', port=9999):
        self.source = source
        self.port = port

    def _word_count(self):
        lines = self.ssc.socketTextStream(self.source, self.port)
        words = lines.flatMap(lambda line: line.split(" "))
        pairs = words.map(lambda word: (word, 1))
        wordCounts = pairs.reduceByKey(lambda x, y: x + y)

        wordCounts.pprint()

        self.start()