3b4e250d
Chunk
staged.
|
1
|
# -*- coding: utf-8 -*-
|
a9c10957
Chunk
hbase-svm & spark...
|
2
3
|
__author__ = 'chunk'
|
ca73c96f
Chunk
Transformed into ...
|
4
|
from ..common import *
|
f69baeb6
Chunk
spark streaming ...
|
5
6
|
from .dependencies import *
from . import *
|
3b4e250d
Chunk
staged.
|
7
8
9
10
11
|
import rdd
from .rdd import *
import sys
from pyspark import RDD
|
1c2a3fa0
Chunk
staged.
|
12
|
from pyspark import SparkConf, SparkContext
|
e3e7e73a
Chunk
spider standalone...
|
13
|
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
|
ca73c96f
Chunk
Transformed into ...
|
14
|
from pyspark.mllib.regression import LabeledPoint
|
a9c10957
Chunk
hbase-svm & spark...
|
15
|
|
02528074
Chunk
staged.
|
16
|
import numpy as np
|
a9c10957
Chunk
hbase-svm & spark...
|
17
18
19
20
|
np.random.seed(sum(map(ord, "whoami")))
package_dir = os.path.dirname(os.path.abspath(__file__))
|
5ec38adb
Chunk
spark-local of da...
|
21
22
|
|
3b4e250d
Chunk
staged.
|
23
24
25
|
class Sparker(object):
def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
load_env()
|
3b4e250d
Chunk
staged.
|
26
27
28
|
self.host = host
self.appname = appname
self.master = kwargs.get('master', 'spark://%s:7077' % self.host)
|
1c2a3fa0
Chunk
staged.
|
29
|
self.conf = SparkConf()
|
3b4e250d
Chunk
staged.
|
30
|
self.conf.setSparkHome(self.host) \
|
e3e7e73a
Chunk
spider standalone...
|
31
|
.setMaster(self.master) \
|
3b4e250d
Chunk
staged.
|
32
|
.setAppName(self.appname)
|
e3ec1f74
Chunk
staged.
|
33
|
|
3b4e250d
Chunk
staged.
|
34
|
# self.conf.set("spark.akka.frameSize","10685760")
|
5ec38adb
Chunk
spark-local of da...
|
35
36
37
38
39
40
41
42
43
44
|
# self.conf.set("spark.driver.extraClassPath", extraClassPath) \
# .set("spark.executor.extraClassPath", extraClassPath) \
# .set("SPARK_CLASSPATH", extraClassPath) \
# .set("spark.driver.memory", "1G") \
# .set("spark.yarn.jar", sparkJar)
self.sc = SparkContext(conf=self.conf)
self.model = None
|
3b4e250d
Chunk
staged.
|
45
46
47
48
49
50
51
|
def read_hbase(self, table_name, func=None, collect=False, parallelism=30):
"""
ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
Filter format:
columns=['cf1:col1', 'cf1:col2']
or
|
3b4e250d
Chunk
staged.
|
52
|
columns=['cf1']
|
1c2a3fa0
Chunk
staged.
|
53
54
55
56
|
"""
hconf = {
|
3b4e250d
Chunk
staged.
|
57
58
59
60
61
|
"hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
# "hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
}
|
8bddd8b3
Chunk
You guess what? T...
|
62
63
64
|
hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
keyClass=hparams["readKeyClass"],
valueClass=hparams["readValueClass"],
|
3b4e250d
Chunk
staged.
|
65
|
keyConverter=hparams["readKeyConverter"],
|
1c2a3fa0
Chunk
staged.
|
66
|
valueConverter=hparams["readValueConverter"],
|
8bddd8b3
Chunk
You guess what? T...
|
67
68
69
|
conf=hconf)
parser = func if func != None else rddparse_data_CV
|
02528074
Chunk
staged.
|
70
71
|
hbase_rdd = hbase_rdd.map(lambda x: parser(x))
|
ece71a0d
Chunk
Streaming! encodi...
|
72
73
|
if collect:
return hbase_rdd.collect()
|
8bddd8b3
Chunk
You guess what? T...
|
74
|
else:
|
3b4e250d
Chunk
staged.
|
75
76
77
|
"""
RDD-hbase bug fixed.(with 'repartition()')
<http://stackoverflow.com/questions/29011574/how-is-spark-partitioning-from-hdfs>
|
02528074
Chunk
staged.
|
78
79
80
81
82
83
84
|
When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat used to read this file. For instance, if you use textFile() it would be TextInputFormat in Hadoop, which would return you a single partition for a single block of HDFS (but the split between partitions would be done on line split, not the exact block split), unless you have a compressed text file. In case of compressed file you would get a single partition for a single file (as compressed text files are not splittable).
When you call rdd.repartition(x) it would perform a shuffle of the data from N partititons you have in rdd to x partitions you want to have, partitioning would be done on round robin basis.
If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) it would be stored in 235 blocks, which means that the RDD you read from this file would have 235 partitions. When you call repartition(1000) your RDD would be marked as to be repartitioned, but in fact it would be shuffled to 1000 partitions only when you will execute an action on top of this RDD (lazy execution concept)
"""
return hbase_rdd.repartition(parallelism)
|
ece71a0d
Chunk
Streaming! encodi...
|
85
86
|
def write_hbase(self, table_name, data, fromrdd=False, columns=None, withdata=False):
|
02528074
Chunk
staged.
|
87
88
89
90
|
"""
Data Format: (Deprecated)
e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
|
1c2a3fa0
Chunk
staged.
|
91
|
Data(from dictionary):
|
3b4e250d
Chunk
staged.
|
92
93
94
95
96
97
98
|
e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
Data(from Rdd):
e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
"""
hconf = {
|
489c5608
Chunk
debugging...
|
99
|
"hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
|
3b4e250d
Chunk
staged.
|
100
101
102
|
# "hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
"hbase.mapred.outputtable": table_name,
|
1c2a3fa0
Chunk
staged.
|
103
104
105
106
107
108
109
110
111
112
|
"mapreduce.outputformat.class": hparams["outputFormatClass"],
"mapreduce.job.output.key.class": hparams["writeKeyClass"],
"mapreduce.job.output.value.class": hparams["writeValueClass"],
}
cols = [col.split(':') for col in columns]
if not fromrdd:
rdd_data = self.sc.parallelize(data)
else:
rdd_data = data
|
3b4e250d
Chunk
staged.
|
113
114
115
|
rdd_data.flatMap(
lambda x: rdd.format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset(
conf=hconf,
|
1c2a3fa0
Chunk
staged.
|
116
|
keyConverter=hparams["writeKeyConverter"],
|
3b4e250d
Chunk
staged.
|
117
118
119
120
|
valueConverter=hparams["writeValueConverter"])
def train_svm(self, X, Y=None):
|
d47ae6ce
Chunk
staged.
|
121
|
if Y == None:
|
3b4e250d
Chunk
staged.
|
122
123
124
125
126
127
128
|
# From rdd_labeled
assert isinstance(X, RDD)
svm = SVMWithSGD.train(X)
else:
# data = []
# for feat, tag in zip(X, Y):
# data.append(LabeledPoint(tag, feat))
|
d47ae6ce
Chunk
staged.
|
129
|
# svm = SVMWithSGD.train(self.sc.parallelize(data))
|
3b4e250d
Chunk
staged.
|
130
131
132
133
134
135
136
137
|
hdd_data = self.sc.parallelize(zip(X, Y), 30).map(lambda x: LabeledPoint(x[1], x[0]))
svm = SVMWithSGD.train(hdd_data)
self.model = svm
# with open('res/svm_spark.model', 'wb') as modelfile:
# model = pickle.dump(svm, modelfile)
return self.model
|
1c2a3fa0
Chunk
staged.
|
138
139
|
def predict_svm(self, x, collect=False, model=None):
"""
|
3b4e250d
Chunk
staged.
|
140
|
From pyspark.mlib.classification.py:
|
ece71a0d
Chunk
Streaming! encodi...
|
141
142
|
>> svm.predict([1.0])
|
3b4e250d
Chunk
staged.
|
143
144
145
146
147
148
149
150
151
152
153
|
1
>> svm.predict(sc.parallelize([[1.0]])).collect()
[1]
>> svm.clearThreshold()
>> svm.predict(array([1.0]))
1.25...
"""
if model is None:
if self.model != None:
model = self.model
else:
|
1c2a3fa0
Chunk
staged.
|
154
|
# with open('res/svm_spark.model', 'rb') as modelfile:
|
3b4e250d
Chunk
staged.
|
155
156
157
158
159
160
161
162
|
# model = pickle.load(modelfile)
raise Exception("No model available!")
res = model.predict(x)
if collect:
return res.collect()
else:
return res
|
d642d837
Chunk
staged.
|
163
|
|
489c5608
Chunk
debugging...
|
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
def test_svm(self, X, Y=None, model=None):
if model is None:
if self.model != None:
model = self.model
else:
# with open('res/svm_spark.model', 'rb') as modelfile:
# model = pickle.load(modelfile)
raise Exception("No model available!")
if Y == None:
assert isinstance(X, RDD)
pass
else:
result_Y = np.array(self.predict_svm(X, collect=True))
return np.mean(Y == result_Y)
|
51708346
Chunk
final experiments...
|
|
|
489c5608
Chunk
debugging...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
489c5608
Chunk
debugging...
|
|
|
d642d837
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
8bddd8b3
Chunk
You guess what? T...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
8bddd8b3
Chunk
You guess what? T...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
4f36b116
Chunk
staged.
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
e3e7e73a
Chunk
spider standalone...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
54e2adda
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
4f36b116
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
018ebf56
Chunk
Spark Streaming T...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
018ebf56
Chunk
Spark Streaming T...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
f4fb4381
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
54e2adda
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0a55c5f4
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
489c5608
Chunk
debugging...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
3b4e250d
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
02528074
Chunk
staged.
|
|
|
f4fb4381
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
f20e20ce
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|