Blame view

mspark/SC.py 7.21 KB
3b4e250d   Chunk   staged.
1
# -*- coding: utf-8 -*-
a9c10957   Chunk   hbase-svm & spark...
2
3
__author__ = 'chunk'

ca73c96f   Chunk   Transformed into ...
4
from ..common import *
f69baeb6   Chunk   spark streaming ...
5
6
from .dependencies import *
from . import *
3b4e250d   Chunk   staged.
7
8
9
10
11
import rdd
from .rdd import *

import sys
from pyspark import RDD
1c2a3fa0   Chunk   staged.
12
from pyspark import SparkConf, SparkContext
e3e7e73a   Chunk   spider standalone...
13
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
ca73c96f   Chunk   Transformed into ...
14
from pyspark.mllib.regression import LabeledPoint
a9c10957   Chunk   hbase-svm & spark...
15

02528074   Chunk   staged.
16
import numpy as np
a9c10957   Chunk   hbase-svm & spark...
17
18
19
20


np.random.seed(sum(map(ord, "whoami")))
package_dir = os.path.dirname(os.path.abspath(__file__))
5ec38adb   Chunk   spark-local of da...
21
22


3b4e250d   Chunk   staged.
23
24
25
class Sparker(object):
    def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
        load_env()
3b4e250d   Chunk   staged.
26
27
28
        self.host = host
        self.appname = appname
        self.master = kwargs.get('master', 'spark://%s:7077' % self.host)
1c2a3fa0   Chunk   staged.
29
        self.conf = SparkConf()
3b4e250d   Chunk   staged.
30
        self.conf.setSparkHome(self.host) \
e3e7e73a   Chunk   spider standalone...
31
            .setMaster(self.master) \
3b4e250d   Chunk   staged.
32
            .setAppName(self.appname)
e3ec1f74   Chunk   staged.
33

3b4e250d   Chunk   staged.
34
        # self.conf.set("spark.akka.frameSize","10685760")
5ec38adb   Chunk   spark-local of da...
35
36
37
38
39
40
41
42
43
44
        # self.conf.set("spark.driver.extraClassPath", extraClassPath) \
        # .set("spark.executor.extraClassPath", extraClassPath) \
        # .set("SPARK_CLASSPATH", extraClassPath) \
        # .set("spark.driver.memory", "1G") \
        # .set("spark.yarn.jar", sparkJar)

        self.sc = SparkContext(conf=self.conf)

        self.model = None

3b4e250d   Chunk   staged.
45
46
47
48
49
50
51
    def read_hbase(self, table_name, func=None, collect=False, parallelism=30):
        """
        ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data

        Filter format:
            columns=['cf1:col1', 'cf1:col2']
            or
3b4e250d   Chunk   staged.
52
            columns=['cf1']
1c2a3fa0   Chunk   staged.
53
54
55
56

        """

        hconf = {
3b4e250d   Chunk   staged.
57
58
59
60
61
            "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
            # "hbase.zookeeper.quorum": self.host,
            "hbase.mapreduce.inputtable": table_name,
        }

8bddd8b3   Chunk   You guess what? T...
62
63
64
        hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
                                            keyClass=hparams["readKeyClass"],
                                            valueClass=hparams["readValueClass"],
3b4e250d   Chunk   staged.
65
                                            keyConverter=hparams["readKeyConverter"],
1c2a3fa0   Chunk   staged.
66
                                            valueConverter=hparams["readValueConverter"],
8bddd8b3   Chunk   You guess what? T...
67
68
69
                                            conf=hconf)

        parser = func if func != None else rddparse_data_CV
02528074   Chunk   staged.
70
71
        hbase_rdd = hbase_rdd.map(lambda x: parser(x))

ece71a0d   Chunk   Streaming! encodi...
72
73
        if collect:
            return hbase_rdd.collect()
8bddd8b3   Chunk   You guess what? T...
74
        else:
3b4e250d   Chunk   staged.
75
76
77
            """
            RDD-hbase bug fixed.(with 'repartition()')
            <http://stackoverflow.com/questions/29011574/how-is-spark-partitioning-from-hdfs>
02528074   Chunk   staged.
78
79
80
81
82
83
84

            When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat used to read this file. For instance, if you use textFile() it would be TextInputFormat in Hadoop, which would return you a single partition for a single block of HDFS (but the split between partitions would be done on line split, not the exact block split), unless you have a compressed text file. In case of compressed file you would get a single partition for a single file (as compressed text files are not splittable).
            When you call rdd.repartition(x) it would perform a shuffle of the data from N partititons you have in rdd to x partitions you want to have, partitioning would be done on round robin basis.
            If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) it would be stored in 235 blocks, which means that the RDD you read from this file would have 235 partitions. When you call repartition(1000) your RDD would be marked as to be repartitioned, but in fact it would be shuffled to 1000 partitions only when you will execute an action on top of this RDD (lazy execution concept)

            """
            return hbase_rdd.repartition(parallelism)
ece71a0d   Chunk   Streaming! encodi...
85
86

    def write_hbase(self, table_name, data, fromrdd=False, columns=None, withdata=False):
02528074   Chunk   staged.
87
88
89
90
        """
        Data Format: (Deprecated)
            e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]

1c2a3fa0   Chunk   staged.
91
        Data(from dictionary):
3b4e250d   Chunk   staged.
92
93
94
95
96
97
98
            e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
                cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
        Data(from Rdd):
            e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
                cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
        """
        hconf = {
489c5608   Chunk   debugging...
99
            "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
3b4e250d   Chunk   staged.
100
101
102
            # "hbase.zookeeper.quorum": self.host,
            "hbase.mapreduce.inputtable": table_name,
            "hbase.mapred.outputtable": table_name,
1c2a3fa0   Chunk   staged.
103
104
105
106
107
108
109
110
111
112
            "mapreduce.outputformat.class": hparams["outputFormatClass"],
            "mapreduce.job.output.key.class": hparams["writeKeyClass"],
            "mapreduce.job.output.value.class": hparams["writeValueClass"],
        }
        cols = [col.split(':') for col in columns]
        if not fromrdd:
            rdd_data = self.sc.parallelize(data)
        else:
            rdd_data = data

3b4e250d   Chunk   staged.
113
114
115
        rdd_data.flatMap(
            lambda x: rdd.format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset(
            conf=hconf,
1c2a3fa0   Chunk   staged.
116
            keyConverter=hparams["writeKeyConverter"],
3b4e250d   Chunk   staged.
117
118
119
120
            valueConverter=hparams["writeValueConverter"])

    def train_svm(self, X, Y=None):

d47ae6ce   Chunk   staged.
121
        if Y == None:
3b4e250d   Chunk   staged.
122
123
124
125
126
127
128
            # From rdd_labeled
            assert isinstance(X, RDD)
            svm = SVMWithSGD.train(X)
        else:
            # data = []
            # for feat, tag in zip(X, Y):
            # data.append(LabeledPoint(tag, feat))
d47ae6ce   Chunk   staged.
129
            # svm = SVMWithSGD.train(self.sc.parallelize(data))
3b4e250d   Chunk   staged.
130
131
132
133
134
135
136
137
            hdd_data = self.sc.parallelize(zip(X, Y), 30).map(lambda x: LabeledPoint(x[1], x[0]))
            svm = SVMWithSGD.train(hdd_data)
        self.model = svm
        # with open('res/svm_spark.model', 'wb') as modelfile:
        # model = pickle.dump(svm, modelfile)

        return self.model

1c2a3fa0   Chunk   staged.
138
139
    def predict_svm(self, x, collect=False, model=None):
        """
3b4e250d   Chunk   staged.
140
        From pyspark.mlib.classification.py:
ece71a0d   Chunk   Streaming! encodi...
141
142

            >> svm.predict([1.0])
3b4e250d   Chunk   staged.
143
144
145
146
147
148
149
150
151
152
153
            1
            >> svm.predict(sc.parallelize([[1.0]])).collect()
            [1]
            >> svm.clearThreshold()
            >> svm.predict(array([1.0]))
            1.25...
        """
        if model is None:
            if self.model != None:
                model = self.model
            else:
1c2a3fa0   Chunk   staged.
154
                # with open('res/svm_spark.model', 'rb') as modelfile:
3b4e250d   Chunk   staged.
155
156
157
158
159
160
161
162
                # model = pickle.load(modelfile)
                raise Exception("No model available!")

        res = model.predict(x)
        if collect:
            return res.collect()
        else:
            return res
d642d837   Chunk   staged.
163

489c5608   Chunk   debugging...
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    def test_svm(self, X, Y=None, model=None):
        if model is None:
            if self.model != None:
                model = self.model
            else:
                # with open('res/svm_spark.model', 'rb') as modelfile:
                # model = pickle.load(modelfile)
                raise Exception("No model available!")

        if Y == None:
            assert isinstance(X, RDD)
            pass
        else:
            result_Y = np.array(self.predict_svm(X, collect=True))
            return np.mean(Y == result_Y)
51708346   Chunk   final experiments...

489c5608   Chunk   debugging...

ece71a0d   Chunk   Streaming! encodi...

489c5608   Chunk   debugging...

d642d837   Chunk   staged.

489c5608   Chunk   debugging...

3b4e250d   Chunk   staged.

1c2a3fa0   Chunk   staged.

8bddd8b3   Chunk   You guess what? T...

1c2a3fa0   Chunk   staged.

8bddd8b3   Chunk   You guess what? T...

1c2a3fa0   Chunk   staged.

e3ec1f74   Chunk   staged.

4f36b116   Chunk   staged.

e3ec1f74   Chunk   staged.

e3e7e73a   Chunk   spider standalone...

1c2a3fa0   Chunk   staged.

d47ae6ce   Chunk   staged.

0fbc087e   Chunk   staged.

1c2a3fa0   Chunk   staged.

0fbc087e   Chunk   staged.

d47ae6ce   Chunk   staged.

0fbc087e   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

54e2adda   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

e3ec1f74   Chunk   staged.

54e2adda   Chunk   staged.

4f36b116   Chunk   staged.

54e2adda   Chunk   staged.

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

a9c10957   Chunk   hbase-svm & spark...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

f4fb4381   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

489c5608   Chunk   debugging...

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

ea1eb31a   Chunk   spark is privileg...

0a55c5f4   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

ea1eb31a   Chunk   spark is privileg...

d47ae6ce   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

489c5608   Chunk   debugging...

ece71a0d   Chunk   Streaming! encodi...

e3ec1f74   Chunk   staged.

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

0fbc087e   Chunk   staged.

3b4e250d   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

ece71a0d   Chunk   Streaming! encodi...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

10b4f63f   Chunk   staged. Before Pa...

02528074   Chunk   staged.

f4fb4381   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

5ec38adb   Chunk   spark-local of da...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

f20e20ce   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.