Blame view

mspark/SC.py 5.36 KB
3b4e250d   Chunk   staged.
1
__author__ = 'chunk'
a9c10957   Chunk   hbase-svm & spark...
2
3

from ..common import *
ca73c96f   Chunk   Transformed into ...
4
from .dependencies import *
f69baeb6   Chunk   spark streaming ...
5
6
from . import *

3b4e250d   Chunk   staged.
7
8
9
10
11
import sys
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array
1c2a3fa0   Chunk   staged.
12
import json
e3e7e73a   Chunk   spider standalone...
13
import pickle
ca73c96f   Chunk   Transformed into ...
14

a9c10957   Chunk   hbase-svm & spark...
15

02528074   Chunk   staged.
16
def parse_cv(raw_row):
a9c10957   Chunk   hbase-svm & spark...
17
18
19
20
    """
    input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
    return: ([0.056273,...],1)
    """
5ec38adb   Chunk   spark-local of da...
21
22
    data = raw_row[1].split('--%--')
    feat = json.loads(data[0].split(':')[-1])
3b4e250d   Chunk   staged.
23
24
25
    tag = 1 if data[-1].split(':')[-1] == 'True' else 0
    return (feat, tag)

3b4e250d   Chunk   staged.
26
27
28

def format_out(row, cols):
    """
1c2a3fa0   Chunk   staged.
29
    input:
3b4e250d   Chunk   staged.
30
        e.g. row =('row1',[1,3400,'hello'])
e3e7e73a   Chunk   spider standalone...
31
            cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']]
3b4e250d   Chunk   staged.
32
    return:
e3ec1f74   Chunk   staged.
33
        [('row1',['row1', 'cf_info', 'id', 1]),('row1',['row1', 'cf_info', 'size', 3400]),('row1',['row1', 'cf_tag', 'desc', 'hello'])]
3b4e250d   Chunk   staged.
34
    """
5ec38adb   Chunk   spark-local of da...
35
36
37
38
39
40
41
42
43
44
    puts = []
    key = row[0]
    for data, col in zip(row[1], cols):
        puts.append((key, [key] + col + [data]))
    return puts


class Sparker(object):
    def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
        load_env()
3b4e250d   Chunk   staged.
45
46
47
48
49
50
51
        self.host = host
        self.appname = appname
        self.master = kwargs.get('master', 'spark://%s:7077' % self.host)
        print self.master
        self.conf = SparkConf()
        self.conf.setSparkHome(self.host) \
            .setMaster(self.master) \
3b4e250d   Chunk   staged.
52
            .setAppName(self.appname)
1c2a3fa0   Chunk   staged.
53
54
55
56

        # self.conf.set("spark.akka.frameSize","10685760")
        # self.conf.set("spark.driver.extraClassPath", extraClassPath) \
        # .set("spark.executor.extraClassPath", extraClassPath) \
3b4e250d   Chunk   staged.
57
58
59
60
61
        # .set("SPARK_CLASSPATH", extraClassPath) \
        # .set("spark.driver.memory", "1G") \
        # .set("spark.yarn.jar", sparkJar)

        self.sc = SparkContext(conf=self.conf)
8bddd8b3   Chunk   You guess what? T...
62
63
64

        self.model = None

3b4e250d   Chunk   staged.
65
    def read_hbase(self, table_name, func=None, collect=False):
1c2a3fa0   Chunk   staged.
66
        """
8bddd8b3   Chunk   You guess what? T...
67
68
69
        ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data

        Filter format:
02528074   Chunk   staged.
70
71
            columns=['cf1:col1', 'cf1:col2']
            or
ece71a0d   Chunk   Streaming! encodi...
72
73
            columns=['cf1']

8bddd8b3   Chunk   You guess what? T...
74
        """
3b4e250d   Chunk   staged.
75
76
77
        hconf = {"hbase.zookeeper.quorum": self.host,
                 "hbase.mapreduce.inputtable": table_name,
                 }
02528074   Chunk   staged.
78
79
80
81
82
83
84

        hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
                                            keyClass=hparams["readKeyClass"],
                                            valueClass=hparams["readValueClass"],
                                            keyConverter=hparams["readKeyConverter"],
                                            valueConverter=hparams["readValueConverter"],
                                            conf=hconf)
ece71a0d   Chunk   Streaming! encodi...
85
86

        parser = func if func != None else parse_cv
02528074   Chunk   staged.
87
88
89
90
        hbase_rdd = hbase_rdd.map(lambda x: parser(x))

        if collect:
            return hbase_rdd.collect()
1c2a3fa0   Chunk   staged.
91
        else:
3b4e250d   Chunk   staged.
92
93
94
95
96
97
98
            return hbase_rdd

    def write_hbase(self, table_name, data, fromrdd=False, columns=None):
        """
        Data Format: (Deprecated)
            e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]

489c5608   Chunk   debugging...
99
        Data(from dictionary):
3b4e250d   Chunk   staged.
100
101
102
            e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
                cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
        Data(from Rdd):
1c2a3fa0   Chunk   staged.
103
104
105
106
107
108
109
110
111
112
            e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
                cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
        """
        hconf = {"hbase.zookeeper.quorum": self.host,
                 "hbase.mapreduce.inputtable": table_name,
                 "hbase.mapred.outputtable": table_name,
                 "mapreduce.outputformat.class": hparams["outputFormatClass"],
                 "mapreduce.job.output.key.class": hparams["writeKeyClass"],
                 "mapreduce.job.output.value.class": hparams["writeValueClass"],
                 }
3b4e250d   Chunk   staged.
113
114
115
        cols = [col.split(':') for col in columns]
        if not fromrdd:
            rdd_data = self.sc.parallelize(data)
1c2a3fa0   Chunk   staged.
116

3b4e250d   Chunk   staged.
117
118
119
120
        rdd_data.flatMap(lambda x: format_out(x, cols)).saveAsNewAPIHadoopDataset(
            conf=hconf,
            keyConverter=hparams["writeKeyConverter"],
            valueConverter=hparams["writeValueConverter"])
d47ae6ce   Chunk   staged.
121

3b4e250d   Chunk   staged.
122
123
124
125
126
127
128

    def train_svm(self, rdd_labeled):
        svm = SVMWithSGD.train(rdd_labeled)
        self.model = svm

        return svm

d47ae6ce   Chunk   staged.
129
    def train_svm(self, X, Y):
3b4e250d   Chunk   staged.
130
131
132
133
134
135
136
137

        # data = []
        # for feat, tag in zip(X, Y):
        # data.append(LabeledPoint(tag, feat))
        # svm = SVMWithSGD.train(self.sc.parallelize(data))

        hdd_data = self.sc.parallelize(zip(X, Y), 20).map(lambda x: LabeledPoint(x[1], x[0]))
        svm = SVMWithSGD.train(hdd_data)
1c2a3fa0   Chunk   staged.
138
139

        self.model = svm
3b4e250d   Chunk   staged.
140

ece71a0d   Chunk   Streaming! encodi...
141
142
        # with open('res/svm_spark.model', 'wb') as modelfile:
        # model = pickle.dump(svm, modelfile)
3b4e250d   Chunk   staged.
143
144
145
146
147
148
149
150
151
152
153

        return svm

    def predict_svm(self, x, model=None):
        if model is None:
            if self.model != None:
                model = self.model
            else:
                # with open('res/svm_spark.model', 'rb') as modelfile:
                # model = pickle.load(modelfile)
                raise Exception("No model available!")
1c2a3fa0   Chunk   staged.
154

3b4e250d   Chunk   staged.
155
156
157
158
        return model.predict(x)

    def test_svm(self, X, Y, model=None):
        pass
d642d837   Chunk   staged.

489c5608   Chunk   debugging...

51708346   Chunk   final experiments...

489c5608   Chunk   debugging...

ece71a0d   Chunk   Streaming! encodi...

489c5608   Chunk   debugging...

d642d837   Chunk   staged.

489c5608   Chunk   debugging...

3b4e250d   Chunk   staged.

1c2a3fa0   Chunk   staged.

8bddd8b3   Chunk   You guess what? T...

1c2a3fa0   Chunk   staged.

8bddd8b3   Chunk   You guess what? T...

1c2a3fa0   Chunk   staged.

e3ec1f74   Chunk   staged.

4f36b116   Chunk   staged.

e3ec1f74   Chunk   staged.

e3e7e73a   Chunk   spider standalone...

1c2a3fa0   Chunk   staged.

d47ae6ce   Chunk   staged.

0fbc087e   Chunk   staged.

1c2a3fa0   Chunk   staged.

0fbc087e   Chunk   staged.

d47ae6ce   Chunk   staged.

0fbc087e   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

54e2adda   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

e3ec1f74   Chunk   staged.

54e2adda   Chunk   staged.

4f36b116   Chunk   staged.

54e2adda   Chunk   staged.

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

a9c10957   Chunk   hbase-svm & spark...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

f4fb4381   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

489c5608   Chunk   debugging...

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

ea1eb31a   Chunk   spark is privileg...

0a55c5f4   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

ea1eb31a   Chunk   spark is privileg...

d47ae6ce   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

489c5608   Chunk   debugging...

ece71a0d   Chunk   Streaming! encodi...

e3ec1f74   Chunk   staged.

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

0fbc087e   Chunk   staged.

3b4e250d   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

ece71a0d   Chunk   Streaming! encodi...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

10b4f63f   Chunk   staged. Before Pa...

02528074   Chunk   staged.

f4fb4381   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

5ec38adb   Chunk   spark-local of da...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

f20e20ce   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.