SSC.py 4.3 KB
__author__ = 'chunk'

from ..common import *
from . import *
from .dependencies import *
from .SC import *

import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

import SocketServer
import pickle
import json
import cjson

import happybase


class HbaseDumper(object):
    def __init__(self, tablename=None):
        self.table_name = tablename if tablename != None else "StreamTable"
        self.table = None
        self.connection = None
        self.sparkcontex = None

    def get_table(self):
        if self.table != None:
            return self.table

        if self.connection is None:
            c = happybase.Connection('HPC-server')
            self.connection = c

        tables = self.connection.tables()
        if self.table_name not in tables:
            families = {'cf_pic': dict(),
                        'cf_info': dict(max_versions=10),
                        'cf_tag': dict(),
                        'cf_feat': dict(),
                        }
            self.connection.create_table(name=self.table_name, families=families)

        table = self.connection.table(name=self.table_name)

        self.table = table

        return table

    def store_item(self, item):
        if self.table == None:
            self.table = self.get_table()
        # data = {}
        # for key in item.keys():
        # data[key + ':'] = item[key]
        # self.table.put(item['id'], data)
        self.table.put(item['id'], {'cf_pic:data': item['data']})

    # @TODO: Bulk put
    def store_items(self, items):
        if self.table == None:
            self.table = self.get_table()

        dict_databuf = {}
        for item in items:
            data = {}
            for key in item.keys():
                data[key + ':'] = item[key]
            dict_databuf[item['id']] = data

        try:
            with self.table.batch(batch_size=5000) as b:
                for rowkey, data in dict_databuf.items():
                    b.put(rowkey, data)
        except ValueError:
            raise
            pass


class StreamSparker(Sparker):
    def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost',
                 port=9999, **kwargs):
        Sparker.__init__(self, host=host, appname=appname)

        self.source = source
        self.port = port
        self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1)

    def start(self):
        self.ssc.start()
        self.ssc.awaitTermination()

    def set_datasource(self, source='localhost', port=9999):
        self.source = source
        self.port = port

    def _word_count(self):
        lines = self.ssc.socketTextStream(self.source, self.port)
        words = lines.flatMap(lambda line: line.split(" "))
        pairs = words.map(lambda word: (word, 1))
        wordCounts = pairs.reduceByKey(lambda x, y: x + y)

        wordCounts.pprint()

        self.start()

    def recvall(self, sock):
        total_data = []
        while True:
            data = sock.recv(4096)
            if not data: break
            total_data.append(data)
        return ''.join(total_data)

    class MyTCPHandler(SocketServer.BaseRequestHandler):
        """
        The RequestHandler class for our server.

        It is instantiated once per connection to the server, and must
        override the handle() method to implement communication to the
        client.
        """

        def handle(self):
            self.data = self.recvall(self.request).strip()
            # self.data = self.request.recv(10485760).strip().decode('utf-8').encode('latin-1')
            # item = json.loads(self.data)
            item = cjson.decode(self.data)
            hbasedumper = HbaseDumper(tablename='STREAMTABLE')
            hbasedumper.store_item(item)
            print item

    def _item_extract(self):
        # SocketServer.TCPServer.allow_reuse_address = True
        # self.sock_s = SocketServer.TCPServer((self.source, self.port), self.MyTCPHandler)
        # self.sock_s.serve_forever()

        lines = self.ssc.socketTextStream(self.source, self.port)
        print lines.collect()
        # words = lines.flatMap(lambda line: line.split(" "))
        # pairs = words.map(lambda word: (word, 1))
        # wordCounts = pairs.reduceByKey(lambda x, y: x + y)
        #
        # wordCounts.pprint()

        self.start()