SC.py
4.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
__author__ = 'chunk'
import sys
from common import *
from dependencies import *
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array
hparams = dict(
inputFormatClass="org.apache.hadoop.hbase.mapreduce.TableInputFormat",
readKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable",
readValueClass="org.apache.hadoop.hbase.client.Result",
readKeyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
readValueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
outputFormatClass="org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
writeKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable",
# writeValueClass="org.apache.hadoop.io.Writable",
writeValueClass="org.apache.hadoop.hbase.client.Put",
writeKeyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
writeValueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter",
)
class Sparker(object):
def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
load_env()
self.host = host
self.appname = appname
self.master = kwargs.get('master', 'spark://%s:7077' % self.appname)
self.conf = SparkConf()
self.conf.setSparkHome(self.host). \
setMaster(self.master). \
setAppName(self.appname)
self.conf.set("spark.driver.extraClassPath", extraClassPath) \
.set("spark.executor.extraClassPath", extraClassPath) \
.set("SPARK_CLASSPATH", extraClassPath) \
.set("spark.driver.memory", "1G") \
.set("spark.yarn.jar", sparkJar)
self.sc = SparkContext(self.conf)
self.model = None
def read_habase(self, table_name, columns=None):
"""
ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
Filter format:
columns=['cf1:col1', 'cf1:col2']
or
columns=['cf1']
"""
hconf = {"hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
}
hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
keyClass=hparams["readKeyClass"],
valueClass=hparams["readValueClass"],
keyConverter=hparams["readKeyConverter"],
valueConverter=hparams["readValueConverter"],
conf=hconf)
output = hbase_rdd.collect()
return output
def write_habase(self, table_name, data):
"""
Data Format:
e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
"""
hconf = {"hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
"hbase.mapred.outputtable": table_name,
"mapreduce.outputformat.class": hparams["outputFormatClass"],
"mapreduce.job.output.key.class": hparams["writeKeyClass"],
"mapreduce.job.output.value.class": hparams["writeValueClass"],
}
self.sc.parallelize(data).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
conf=hconf,
keyConverter=hparams["writeKeyConverter"],
valueConverter=hparams["writeValueConverter"])
def train_svm(self, rdd_labeled):
svm = SVMWithSGD.train(rdd_labeled)
self.model = svm
return svm
def train_svm(self, X, Y):
data = zip(X, Y).map(LabeledPoint)
svm = SVMWithSGD.train(self.sc.parallelize(data))
self.model = svm
return svm
def predict_svm(self, x, model=None):
if model is None:
if self.model != None:
model = self.model
else:
raise Exception("No Model available!")
return self.model.predict(x)