3b4e250d
Chunk
staged.
|
1
|
__author__ = 'chunk'
|
a9c10957
Chunk
hbase-svm & spark...
|
2
3
|
from ..common import *
|
ca73c96f
Chunk
Transformed into ...
|
4
|
from .dependencies import *
|
f69baeb6
Chunk
spark streaming ...
|
5
6
|
from . import *
|
3b4e250d
Chunk
staged.
|
7
8
9
10
11
|
import sys
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array
|
1c2a3fa0
Chunk
staged.
|
12
|
import json
|
e3e7e73a
Chunk
spider standalone...
|
13
|
import pickle
|
ca73c96f
Chunk
Transformed into ...
|
14
|
|
a9c10957
Chunk
hbase-svm & spark...
|
15
|
|
02528074
Chunk
staged.
|
16
|
def parse_cv(raw_row):
|
a9c10957
Chunk
hbase-svm & spark...
|
17
18
19
20
|
"""
input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
return: ([0.056273,...],1)
"""
|
5ec38adb
Chunk
spark-local of da...
|
21
22
|
data = raw_row[1].split('--%--')
feat = json.loads(data[0].split(':')[-1])
|
3b4e250d
Chunk
staged.
|
23
24
25
|
tag = 1 if data[-1].split(':')[-1] == 'True' else 0
return (feat, tag)
|
3b4e250d
Chunk
staged.
|
26
27
28
|
def format_out(row, cols):
"""
|
1c2a3fa0
Chunk
staged.
|
29
|
input:
|
3b4e250d
Chunk
staged.
|
30
|
e.g. row =('row1',[1,3400,'hello'])
|
e3e7e73a
Chunk
spider standalone...
|
31
|
cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']]
|
3b4e250d
Chunk
staged.
|
32
|
return:
|
e3ec1f74
Chunk
staged.
|
33
|
[('row1',['row1', 'cf_info', 'id', 1]),('row1',['row1', 'cf_info', 'size', 3400]),('row1',['row1', 'cf_tag', 'desc', 'hello'])]
|
3b4e250d
Chunk
staged.
|
34
|
"""
|
5ec38adb
Chunk
spark-local of da...
|
35
36
37
38
39
40
41
42
43
44
|
puts = []
key = row[0]
for data, col in zip(row[1], cols):
puts.append((key, [key] + col + [data]))
return puts
class Sparker(object):
def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
load_env()
|
3b4e250d
Chunk
staged.
|
45
46
47
48
49
50
51
|
self.host = host
self.appname = appname
self.master = kwargs.get('master', 'spark://%s:7077' % self.host)
print self.master
self.conf = SparkConf()
self.conf.setSparkHome(self.host) \
.setMaster(self.master) \
|
3b4e250d
Chunk
staged.
|
52
|
.setAppName(self.appname)
|
1c2a3fa0
Chunk
staged.
|
53
54
55
56
|
# self.conf.set("spark.akka.frameSize","10685760")
# self.conf.set("spark.driver.extraClassPath", extraClassPath) \
# .set("spark.executor.extraClassPath", extraClassPath) \
|
3b4e250d
Chunk
staged.
|
57
58
59
60
61
|
# .set("SPARK_CLASSPATH", extraClassPath) \
# .set("spark.driver.memory", "1G") \
# .set("spark.yarn.jar", sparkJar)
self.sc = SparkContext(conf=self.conf)
|
8bddd8b3
Chunk
You guess what? T...
|
62
63
64
|
self.model = None
|
3b4e250d
Chunk
staged.
|
65
|
def read_hbase(self, table_name, func=None, collect=False):
|
1c2a3fa0
Chunk
staged.
|
66
|
"""
|
8bddd8b3
Chunk
You guess what? T...
|
67
68
69
|
ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
Filter format:
|
02528074
Chunk
staged.
|
70
71
|
columns=['cf1:col1', 'cf1:col2']
or
|
ece71a0d
Chunk
Streaming! encodi...
|
72
73
|
columns=['cf1']
|
8bddd8b3
Chunk
You guess what? T...
|
74
|
"""
|
3b4e250d
Chunk
staged.
|
75
76
77
|
hconf = {"hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
}
|
02528074
Chunk
staged.
|
78
79
80
81
82
83
84
|
hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
keyClass=hparams["readKeyClass"],
valueClass=hparams["readValueClass"],
keyConverter=hparams["readKeyConverter"],
valueConverter=hparams["readValueConverter"],
conf=hconf)
|
ece71a0d
Chunk
Streaming! encodi...
|
85
86
|
parser = func if func != None else parse_cv
|
02528074
Chunk
staged.
|
87
88
89
90
|
hbase_rdd = hbase_rdd.map(lambda x: parser(x))
if collect:
return hbase_rdd.collect()
|
1c2a3fa0
Chunk
staged.
|
91
|
else:
|
3b4e250d
Chunk
staged.
|
92
93
94
95
96
97
98
|
return hbase_rdd
def write_hbase(self, table_name, data, fromrdd=False, columns=None):
"""
Data Format: (Deprecated)
e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
|
489c5608
Chunk
debugging...
|
99
|
Data(from dictionary):
|
3b4e250d
Chunk
staged.
|
100
101
102
|
e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
Data(from Rdd):
|
1c2a3fa0
Chunk
staged.
|
103
104
105
106
107
108
109
110
111
112
|
e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
"""
hconf = {"hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
"hbase.mapred.outputtable": table_name,
"mapreduce.outputformat.class": hparams["outputFormatClass"],
"mapreduce.job.output.key.class": hparams["writeKeyClass"],
"mapreduce.job.output.value.class": hparams["writeValueClass"],
}
|
3b4e250d
Chunk
staged.
|
113
114
115
|
cols = [col.split(':') for col in columns]
if not fromrdd:
rdd_data = self.sc.parallelize(data)
|
1c2a3fa0
Chunk
staged.
|
116
|
|
3b4e250d
Chunk
staged.
|
117
118
119
120
|
rdd_data.flatMap(lambda x: format_out(x, cols)).saveAsNewAPIHadoopDataset(
conf=hconf,
keyConverter=hparams["writeKeyConverter"],
valueConverter=hparams["writeValueConverter"])
|
d47ae6ce
Chunk
staged.
|
121
|
|
3b4e250d
Chunk
staged.
|
122
123
124
125
126
127
128
|
def train_svm(self, rdd_labeled):
svm = SVMWithSGD.train(rdd_labeled)
self.model = svm
return svm
|
d47ae6ce
Chunk
staged.
|
129
|
def train_svm(self, X, Y):
|
3b4e250d
Chunk
staged.
|
130
131
132
133
134
135
136
137
|
# data = []
# for feat, tag in zip(X, Y):
# data.append(LabeledPoint(tag, feat))
# svm = SVMWithSGD.train(self.sc.parallelize(data))
hdd_data = self.sc.parallelize(zip(X, Y), 20).map(lambda x: LabeledPoint(x[1], x[0]))
svm = SVMWithSGD.train(hdd_data)
|
1c2a3fa0
Chunk
staged.
|
138
139
|
self.model = svm
|
3b4e250d
Chunk
staged.
|
140
|
|
ece71a0d
Chunk
Streaming! encodi...
|
141
142
|
# with open('res/svm_spark.model', 'wb') as modelfile:
# model = pickle.dump(svm, modelfile)
|
3b4e250d
Chunk
staged.
|
143
144
145
146
147
148
149
150
151
152
153
|
return svm
def predict_svm(self, x, model=None):
if model is None:
if self.model != None:
model = self.model
else:
# with open('res/svm_spark.model', 'rb') as modelfile:
# model = pickle.load(modelfile)
raise Exception("No model available!")
|
1c2a3fa0
Chunk
staged.
|
154
|
|
3b4e250d
Chunk
staged.
|
155
156
157
158
|
return model.predict(x)
def test_svm(self, X, Y, model=None):
pass
|
d642d837
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
51708346
Chunk
final experiments...
|
|
|
489c5608
Chunk
debugging...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
489c5608
Chunk
debugging...
|
|
|
d642d837
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
8bddd8b3
Chunk
You guess what? T...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
8bddd8b3
Chunk
You guess what? T...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
4f36b116
Chunk
staged.
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
e3e7e73a
Chunk
spider standalone...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
54e2adda
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
4f36b116
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
018ebf56
Chunk
Spark Streaming T...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
018ebf56
Chunk
Spark Streaming T...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
f4fb4381
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
54e2adda
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0a55c5f4
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
489c5608
Chunk
debugging...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
3b4e250d
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
02528074
Chunk
staged.
|
|
|
f4fb4381
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
f20e20ce
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|