__author__ = 'chunk' from ..common import * from . import * from .dependencies import * from .SC import * import sys from pyspark import SparkConf, SparkContext from pyspark.streaming import StreamingContext class StreamSparker(Sparker): def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost', port=9999, **kwargs): Sparker.__init__(self, host, appname) self.source = source self.port = port self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1) def start(self): self.ssc.start() self.ssc.awaitTermination() def set_datasource(self, source='localhost', port=9999): self.source = source self.port = port def _word_count(self): lines = self.ssc.socketTextStream(self.source, self.port) words = lines.flatMap(lambda line: line.split(" ")) pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) wordCounts.pprint() self.start()