__author__ = 'chunk' from ..common import * from . import * from .dependencies import * from .SC import * import sys from pyspark import SparkConf, SparkContext from pyspark.streaming import StreamingContext import SocketServer import pickle import json import cjson import happybase class HbaseDumper(object): def __init__(self, tablename=None): self.table_name = tablename if tablename != None else "StreamTable" self.table = None self.connection = None self.sparkcontex = None def get_table(self): if self.table != None: return self.table if self.connection is None: c = happybase.Connection('HPC-server') self.connection = c tables = self.connection.tables() if self.table_name not in tables: families = {'cf_pic': dict(), 'cf_info': dict(max_versions=10), 'cf_tag': dict(), 'cf_feat': dict(), } self.connection.create_table(name=self.table_name, families=families) table = self.connection.table(name=self.table_name) self.table = table return table def store_item(self, item): if self.table == None: self.table = self.get_table() # data = {} # for key in item.keys(): # data[key + ':'] = item[key] # self.table.put(item['id'], data) self.table.put(item['id'], {'cf_pic:data': item['data']}) # @TODO: Bulk put def store_items(self, items): if self.table == None: self.table = self.get_table() dict_databuf = {} for item in items: data = {} for key in item.keys(): data[key + ':'] = item[key] dict_databuf[item['id']] = data try: with self.table.batch(batch_size=5000) as b: for rowkey, data in dict_databuf.items(): b.put(rowkey, data) except ValueError: raise pass class StreamSparker(Sparker): def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost', port=9999, **kwargs): Sparker.__init__(self, host=host, appname=appname) self.source = source self.port = port self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1) def start(self): self.ssc.start() self.ssc.awaitTermination() def set_datasource(self, source='localhost', port=9999): self.source = source self.port = port def _word_count(self): lines = self.ssc.socketTextStream(self.source, self.port) words = lines.flatMap(lambda line: line.split(" ")) pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) wordCounts.pprint() self.start() def recvall(self, sock): total_data = [] while True: data = sock.recv(4096) if not data: break total_data.append(data) return ''.join(total_data) class MyTCPHandler(SocketServer.BaseRequestHandler): """ The RequestHandler class for our server. It is instantiated once per connection to the server, and must override the handle() method to implement communication to the client. """ def handle(self): self.data = self.recvall(self.request).strip() # self.data = self.request.recv(10485760).strip().decode('utf-8').encode('latin-1') # item = json.loads(self.data) item = cjson.decode(self.data) hbasedumper = HbaseDumper(tablename='STREAMTABLE') hbasedumper.store_item(item) print item def _item_extract(self): # SocketServer.TCPServer.allow_reuse_address = True # self.sock_s = SocketServer.TCPServer((self.source, self.port), self.MyTCPHandler) # self.sock_s.serve_forever() lines = self.ssc.socketTextStream(self.source, self.port) print lines.collect() # words = lines.flatMap(lambda line: line.split(" ")) # pairs = words.map(lambda word: (word, 1)) # wordCounts = pairs.reduceByKey(lambda x, y: x + y) # # wordCounts.pprint() self.start()