3b4e250d
Chunk
staged.
|
1
|
__author__ = 'chunk'
|
a9c10957
Chunk
hbase-svm & spark...
|
2
3
|
from ..common import *
|
ca73c96f
Chunk
Transformed into ...
|
4
|
from .dependencies import *
|
f69baeb6
Chunk
spark streaming ...
|
5
6
|
from . import *
|
3b4e250d
Chunk
staged.
|
7
8
9
10
11
|
import sys
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array
|
1c2a3fa0
Chunk
staged.
|
12
|
import json
|
e3e7e73a
Chunk
spider standalone...
|
13
|
import pickle
|
ca73c96f
Chunk
Transformed into ...
|
14
|
|
a9c10957
Chunk
hbase-svm & spark...
|
15
|
|
02528074
Chunk
staged.
|
16
|
def parse_cv(raw_row):
|
a9c10957
Chunk
hbase-svm & spark...
|
17
18
19
20
|
"""
input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
return: ([0.056273,...],1)
"""
|
5ec38adb
Chunk
spark-local of da...
|
21
22
|
data = raw_row[1].split('--%--')
feat = json.loads(data[0].split(':')[-1])
|
3b4e250d
Chunk
staged.
|
23
24
25
|
tag = 1 if data[-1].split(':')[-1] == 'True' else 0
return (feat, tag)
|
3b4e250d
Chunk
staged.
|
26
27
28
|
class Sparker(object):
def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
|
1c2a3fa0
Chunk
staged.
|
29
|
load_env()
|
3b4e250d
Chunk
staged.
|
30
|
self.host = host
|
e3e7e73a
Chunk
spider standalone...
|
31
|
self.appname = appname
|
3b4e250d
Chunk
staged.
|
32
|
self.master = kwargs.get('master', 'spark://%s:7077' % self.appname)
|
e3ec1f74
Chunk
staged.
|
33
|
print self.master
|
3b4e250d
Chunk
staged.
|
34
|
self.conf = SparkConf()
|
5ec38adb
Chunk
spark-local of da...
|
35
36
37
38
39
40
41
42
43
44
|
self.conf.setSparkHome(self.host) \
.setMaster(self.master) \
.setAppName(self.appname)
# self.conf.set("spark.driver.extraClassPath", extraClassPath) \
# .set("spark.executor.extraClassPath", extraClassPath) \
# .set("SPARK_CLASSPATH", extraClassPath) \
# .set("spark.driver.memory", "1G") \
# .set("spark.yarn.jar", sparkJar)
|
3b4e250d
Chunk
staged.
|
45
46
47
48
49
50
51
|
self.sc = SparkContext(conf=self.conf)
self.model = None
def read_habase(self, table_name, columns=None):
"""
ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
|
3b4e250d
Chunk
staged.
|
52
|
|
1c2a3fa0
Chunk
staged.
|
53
54
55
56
|
Filter format:
columns=['cf1:col1', 'cf1:col2']
or
columns=['cf1']
|
3b4e250d
Chunk
staged.
|
57
58
59
60
61
|
"""
hconf = {"hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
}
|
8bddd8b3
Chunk
You guess what? T...
|
62
63
64
|
hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
keyClass=hparams["readKeyClass"],
|
3b4e250d
Chunk
staged.
|
65
|
valueClass=hparams["readValueClass"],
|
1c2a3fa0
Chunk
staged.
|
66
|
keyConverter=hparams["readKeyConverter"],
|
8bddd8b3
Chunk
You guess what? T...
|
67
68
69
|
valueConverter=hparams["readValueConverter"],
conf=hconf)
hbase_rdd = hbase_rdd.map(lambda x: parse_cv(x))
|
02528074
Chunk
staged.
|
70
71
|
output = hbase_rdd.collect()
return output
|
ece71a0d
Chunk
Streaming! encodi...
|
72
73
|
def write_habase(self, table_name, data):
|
8bddd8b3
Chunk
You guess what? T...
|
74
|
"""
|
3b4e250d
Chunk
staged.
|
75
76
77
|
Data Format:
e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
"""
|
02528074
Chunk
staged.
|
78
79
80
81
82
83
84
|
hconf = {"hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
"hbase.mapred.outputtable": table_name,
"mapreduce.outputformat.class": hparams["outputFormatClass"],
"mapreduce.job.output.key.class": hparams["writeKeyClass"],
"mapreduce.job.output.value.class": hparams["writeValueClass"],
}
|
ece71a0d
Chunk
Streaming! encodi...
|
85
86
|
self.sc.parallelize(data).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
|
02528074
Chunk
staged.
|
87
88
89
90
|
conf=hconf,
keyConverter=hparams["writeKeyConverter"],
valueConverter=hparams["writeValueConverter"])
|
1c2a3fa0
Chunk
staged.
|
91
|
|
3b4e250d
Chunk
staged.
|
92
93
94
95
96
97
98
|
def train_svm(self, rdd_labeled):
svm = SVMWithSGD.train(rdd_labeled)
self.model = svm
return svm
def train_svm(self, X, Y):
|
489c5608
Chunk
debugging...
|
99
|
data = []
|
3b4e250d
Chunk
staged.
|
100
101
102
|
for feat, tag in zip(X, Y):
data.append(LabeledPoint(tag, feat))
svm = SVMWithSGD.train(self.sc.parallelize(data))
|
1c2a3fa0
Chunk
staged.
|
103
104
105
106
107
108
109
110
111
112
|
# hdd_data = self.sc.parallelize(zip(X, Y)).map(lambda x: LabeledPoint(x[1], x[0]))
# svm = SVMWithSGD.train(self.sc.parallelize(hdd_data))
self.model = svm
# with open('res/svm_spark.model', 'wb') as modelfile:
# model = pickle.dump(svm, modelfile)
return svm
|
3b4e250d
Chunk
staged.
|
113
114
115
|
def predict_svm(self, x, model=None):
if model is None:
|
1c2a3fa0
Chunk
staged.
|
116
|
if self.model != None:
|
3b4e250d
Chunk
staged.
|
117
118
119
120
|
model = self.model
else:
# with open('res/svm_spark.model', 'rb') as modelfile:
# model = pickle.load(modelfile)
|
d47ae6ce
Chunk
staged.
|
121
|
raise Exception("No model available!")
|
3b4e250d
Chunk
staged.
|
122
123
124
125
126
|
return model.predict(x)
def test_svm(self, X, Y, model=None):
pass
|
d47ae6ce
Chunk
staged.
|
|
|
3b4e250d
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
3b4e250d
Chunk
staged.
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
3b4e250d
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
51708346
Chunk
final experiments...
|
|
|
489c5608
Chunk
debugging...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
489c5608
Chunk
debugging...
|
|
|
d642d837
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
8bddd8b3
Chunk
You guess what? T...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
8bddd8b3
Chunk
You guess what? T...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
4f36b116
Chunk
staged.
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
e3e7e73a
Chunk
spider standalone...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
54e2adda
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
4f36b116
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
018ebf56
Chunk
Spark Streaming T...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
018ebf56
Chunk
Spark Streaming T...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
f4fb4381
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
54e2adda
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0a55c5f4
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
489c5608
Chunk
debugging...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
3b4e250d
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
02528074
Chunk
staged.
|
|
|
f4fb4381
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
f20e20ce
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|