Blame view

ImageR/imager/mspark/SSC.py 4.3 KB
1f1943eb   qijun   initial commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
__author__ = 'chunk'

from ..common import *
from . import *
from .dependencies import *
from .SC import *

import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

import SocketServer
import pickle
import json
import cjson

import happybase


class HbaseDumper(object):
    def __init__(self, tablename=None):
        self.table_name = tablename if tablename != None else "StreamTable"
        self.table = None
        self.connection = None
        self.sparkcontex = None

    def get_table(self):
        if self.table != None:
            return self.table

        if self.connection is None:
            c = happybase.Connection('HPC-server')
            self.connection = c

        tables = self.connection.tables()
        if self.table_name not in tables:
            families = {'cf_pic': dict(),
                        'cf_info': dict(max_versions=10),
                        'cf_tag': dict(),
                        'cf_feat': dict(),
                        }
            self.connection.create_table(name=self.table_name, families=families)

        table = self.connection.table(name=self.table_name)

        self.table = table

        return table

    def store_item(self, item):
        if self.table == None:
            self.table = self.get_table()
        # data = {}
        # for key in item.keys():
        # data[key + ':'] = item[key]
        # self.table.put(item['id'], data)
        self.table.put(item['id'], {'cf_pic:data': item['data']})

    # @TODO: Bulk put
    def store_items(self, items):
        if self.table == None:
            self.table = self.get_table()

        dict_databuf = {}
        for item in items:
            data = {}
            for key in item.keys():
                data[key + ':'] = item[key]
            dict_databuf[item['id']] = data

        try:
            with self.table.batch(batch_size=5000) as b:
                for rowkey, data in dict_databuf.items():
                    b.put(rowkey, data)
        except ValueError:
            raise
            pass


class StreamSparker(Sparker):
    def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost',
                 port=9999, **kwargs):
        Sparker.__init__(self, host=host, appname=appname)

        self.source = source
        self.port = port
        self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1)

    def start(self):
        self.ssc.start()
        self.ssc.awaitTermination()

    def set_datasource(self, source='localhost', port=9999):
        self.source = source
        self.port = port

    def _word_count(self):
        lines = self.ssc.socketTextStream(self.source, self.port)
        words = lines.flatMap(lambda line: line.split(" "))
        pairs = words.map(lambda word: (word, 1))
        wordCounts = pairs.reduceByKey(lambda x, y: x + y)

        wordCounts.pprint()

        self.start()

    def recvall(self, sock):
        total_data = []
        while True:
            data = sock.recv(4096)
            if not data: break
            total_data.append(data)
        return ''.join(total_data)

    class MyTCPHandler(SocketServer.BaseRequestHandler):
        """
        The RequestHandler class for our server.

        It is instantiated once per connection to the server, and must
        override the handle() method to implement communication to the
        client.
        """

        def handle(self):
            self.data = self.recvall(self.request).strip()
            # self.data = self.request.recv(10485760).strip().decode('utf-8').encode('latin-1')
            # item = json.loads(self.data)
            item = cjson.decode(self.data)
            hbasedumper = HbaseDumper(tablename='STREAMTABLE')
            hbasedumper.store_item(item)
            print item

    def _item_extract(self):
        # SocketServer.TCPServer.allow_reuse_address = True
        # self.sock_s = SocketServer.TCPServer((self.source, self.port), self.MyTCPHandler)
        # self.sock_s.serve_forever()

        lines = self.ssc.socketTextStream(self.source, self.port)
        print lines.collect()
        # words = lines.flatMap(lambda line: line.split(" "))
        # pairs = words.map(lambda word: (word, 1))
        # wordCounts = pairs.reduceByKey(lambda x, y: x + y)
        #
        # wordCounts.pprint()

        self.start()