Blame view

mspark/SSC.py 4.3 KB
f69baeb6   Chunk   spark streaming ...
1
2
3
4
5
6
7
8
9
10
11
__author__ = 'chunk'

from ..common import *
from . import *
from .dependencies import *
from .SC import *

import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

ece71a0d   Chunk   Streaming! encodi...
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import SocketServer
import pickle
import json
import cjson

import happybase


class HbaseDumper(object):
    def __init__(self, tablename=None):
        self.table_name = tablename if tablename != None else "StreamTable"
        self.table = None
        self.connection = None
        self.sparkcontex = None

    def get_table(self):
        if self.table != None:
            return self.table

        if self.connection is None:
            c = happybase.Connection('HPC-server')
            self.connection = c

        tables = self.connection.tables()
        if self.table_name not in tables:
            families = {'cf_pic': dict(),
                        'cf_info': dict(max_versions=10),
                        'cf_tag': dict(),
                        'cf_feat': dict(),
                        }
            self.connection.create_table(name=self.table_name, families=families)

        table = self.connection.table(name=self.table_name)

        self.table = table

        return table

    def store_item(self, item):
        if self.table == None:
            self.table = self.get_table()
        # data = {}
        # for key in item.keys():
        # data[key + ':'] = item[key]
        # self.table.put(item['id'], data)
        self.table.put(item['id'], {'cf_pic:data': item['data']})

    # @TODO: Bulk put
    def store_items(self, items):
        if self.table == None:
            self.table = self.get_table()

        dict_databuf = {}
        for item in items:
            data = {}
            for key in item.keys():
                data[key + ':'] = item[key]
            dict_databuf[item['id']] = data

        try:
            with self.table.batch(batch_size=5000) as b:
                for rowkey, data in dict_databuf.items():
                    b.put(rowkey, data)
        except ValueError:
            raise
            pass

f69baeb6   Chunk   spark streaming ...
79
80

class StreamSparker(Sparker):
ece71a0d   Chunk   Streaming! encodi...
81
82
    def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost',
                 port=9999, **kwargs):
018ebf56   Chunk   Spark Streaming T...
83
        Sparker.__init__(self, host=host, appname=appname)
f69baeb6   Chunk   spark streaming ...
84
85
86
87
88
89
90
91
92

        self.source = source
        self.port = port
        self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1)

    def start(self):
        self.ssc.start()
        self.ssc.awaitTermination()

f69baeb6   Chunk   spark streaming ...
93
94
95
96
97
98
99
100
101
102
103
104
105
106
    def set_datasource(self, source='localhost', port=9999):
        self.source = source
        self.port = port

    def _word_count(self):
        lines = self.ssc.socketTextStream(self.source, self.port)
        words = lines.flatMap(lambda line: line.split(" "))
        pairs = words.map(lambda word: (word, 1))
        wordCounts = pairs.reduceByKey(lambda x, y: x + y)

        wordCounts.pprint()

        self.start()

ece71a0d   Chunk   Streaming! encodi...
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
    def recvall(self, sock):
        total_data = []
        while True:
            data = sock.recv(4096)
            if not data: break
            total_data.append(data)
        return ''.join(total_data)

    class MyTCPHandler(SocketServer.BaseRequestHandler):
        """
        The RequestHandler class for our server.

        It is instantiated once per connection to the server, and must
        override the handle() method to implement communication to the
        client.
        """

        def handle(self):
            self.data = self.recvall(self.request).strip()
            # self.data = self.request.recv(10485760).strip().decode('utf-8').encode('latin-1')
            # item = json.loads(self.data)
            item = cjson.decode(self.data)
            hbasedumper = HbaseDumper(tablename='STREAMTABLE')
            hbasedumper.store_item(item)
            print item

    def _item_extract(self):
        # SocketServer.TCPServer.allow_reuse_address = True
        # self.sock_s = SocketServer.TCPServer((self.source, self.port), self.MyTCPHandler)
        # self.sock_s.serve_forever()
f69baeb6   Chunk   spark streaming ...
137

ece71a0d   Chunk   Streaming! encodi...
138
139
140
141
142
143
144
        lines = self.ssc.socketTextStream(self.source, self.port)
        print lines.collect()
        # words = lines.flatMap(lambda line: line.split(" "))
        # pairs = words.map(lambda word: (word, 1))
        # wordCounts = pairs.reduceByKey(lambda x, y: x + y)
        #
        # wordCounts.pprint()
f69baeb6   Chunk   spark streaming ...
145

ece71a0d   Chunk   Streaming! encodi...
146
        self.start()