SSC.py
4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
__author__ = 'chunk'
from ..common import *
from . import *
from .dependencies import *
from .SC import *
import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
import SocketServer
import pickle
import json
import cjson
import happybase
class HbaseDumper(object):
def __init__(self, tablename=None):
self.table_name = tablename if tablename != None else "StreamTable"
self.table = None
self.connection = None
self.sparkcontex = None
def get_table(self):
if self.table != None:
return self.table
if self.connection is None:
c = happybase.Connection('HPC-server')
self.connection = c
tables = self.connection.tables()
if self.table_name not in tables:
families = {'cf_pic': dict(),
'cf_info': dict(max_versions=10),
'cf_tag': dict(),
'cf_feat': dict(),
}
self.connection.create_table(name=self.table_name, families=families)
table = self.connection.table(name=self.table_name)
self.table = table
return table
def store_item(self, item):
if self.table == None:
self.table = self.get_table()
# data = {}
# for key in item.keys():
# data[key + ':'] = item[key]
# self.table.put(item['id'], data)
self.table.put(item['id'], {'cf_pic:data': item['data']})
# @TODO: Bulk put
def store_items(self, items):
if self.table == None:
self.table = self.get_table()
dict_databuf = {}
for item in items:
data = {}
for key in item.keys():
data[key + ':'] = item[key]
dict_databuf[item['id']] = data
try:
with self.table.batch(batch_size=5000) as b:
for rowkey, data in dict_databuf.items():
b.put(rowkey, data)
except ValueError:
raise
pass
class StreamSparker(Sparker):
def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost',
port=9999, **kwargs):
Sparker.__init__(self, host=host, appname=appname)
self.source = source
self.port = port
self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1)
def start(self):
self.ssc.start()
self.ssc.awaitTermination()
def set_datasource(self, source='localhost', port=9999):
self.source = source
self.port = port
def _word_count(self):
lines = self.ssc.socketTextStream(self.source, self.port)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
self.start()
def recvall(self, sock):
total_data = []
while True:
data = sock.recv(4096)
if not data: break
total_data.append(data)
return ''.join(total_data)
class MyTCPHandler(SocketServer.BaseRequestHandler):
"""
The RequestHandler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
"""
def handle(self):
self.data = self.recvall(self.request).strip()
# self.data = self.request.recv(10485760).strip().decode('utf-8').encode('latin-1')
# item = json.loads(self.data)
item = cjson.decode(self.data)
hbasedumper = HbaseDumper(tablename='STREAMTABLE')
hbasedumper.store_item(item)
print item
def _item_extract(self):
# SocketServer.TCPServer.allow_reuse_address = True
# self.sock_s = SocketServer.TCPServer((self.source, self.port), self.MyTCPHandler)
# self.sock_s.serve_forever()
lines = self.ssc.socketTextStream(self.source, self.port)
print lines.collect()
# words = lines.flatMap(lambda line: line.split(" "))
# pairs = words.map(lambda word: (word, 1))
# wordCounts = pairs.reduceByKey(lambda x, y: x + y)
#
# wordCounts.pprint()
self.start()