Blame view

mspark/SC.py 7.19 KB
3b4e250d   Chunk   staged.
1
# -*- coding: utf-8 -*-
a9c10957   Chunk   hbase-svm & spark...
2
3
__author__ = 'chunk'

ca73c96f   Chunk   Transformed into ...
4
from ..common import *
f69baeb6   Chunk   spark streaming ...
5
6
from .dependencies import *
from . import *
3b4e250d   Chunk   staged.
7
8
9
10
11
from .rdd import *

import sys
from pyspark import RDD
from pyspark import SparkConf, SparkContext
1c2a3fa0   Chunk   staged.
12
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
e3e7e73a   Chunk   spider standalone...
13
from pyspark.mllib.regression import LabeledPoint
ca73c96f   Chunk   Transformed into ...
14

a9c10957   Chunk   hbase-svm & spark...
15
import numpy as np
02528074   Chunk   staged.
16

a9c10957   Chunk   hbase-svm & spark...
17
18
19
20

np.random.seed(sum(map(ord, "whoami")))
package_dir = os.path.dirname(os.path.abspath(__file__))

5ec38adb   Chunk   spark-local of da...
21
22

class Sparker(object):
3b4e250d   Chunk   staged.
23
24
25
    def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
        load_env()
        self.host = host
3b4e250d   Chunk   staged.
26
27
28
        self.appname = appname
        self.master = kwargs.get('master', 'spark://%s:7077' % self.host)
        self.conf = SparkConf()
1c2a3fa0   Chunk   staged.
29
        self.conf.setSparkHome(self.host) \
3b4e250d   Chunk   staged.
30
            .setMaster(self.master) \
e3e7e73a   Chunk   spider standalone...
31
            .setAppName(self.appname)
3b4e250d   Chunk   staged.
32

e3ec1f74   Chunk   staged.
33
        # self.conf.set("spark.akka.frameSize","10685760")
3b4e250d   Chunk   staged.
34
        # self.conf.set("spark.driver.extraClassPath", extraClassPath) \
5ec38adb   Chunk   spark-local of da...
35
36
37
38
39
40
41
42
43
44
        # .set("spark.executor.extraClassPath", extraClassPath) \
        # .set("SPARK_CLASSPATH", extraClassPath) \
        # .set("spark.driver.memory", "1G") \
        # .set("spark.yarn.jar", sparkJar)

        self.sc = SparkContext(conf=self.conf)

        self.model = None

    def read_hbase(self, table_name, func=None, collect=False, parallelism=30):
3b4e250d   Chunk   staged.
45
46
47
48
49
50
51
        """
        ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data

        Filter format:
            columns=['cf1:col1', 'cf1:col2']
            or
            columns=['cf1']
3b4e250d   Chunk   staged.
52

1c2a3fa0   Chunk   staged.
53
54
55
56
        """

        hconf = {
            "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
3b4e250d   Chunk   staged.
57
58
59
60
61
            # "hbase.zookeeper.quorum": self.host,
            "hbase.mapreduce.inputtable": table_name,
        }

        hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
8bddd8b3   Chunk   You guess what? T...
62
63
64
                                            keyClass=hparams["readKeyClass"],
                                            valueClass=hparams["readValueClass"],
                                            keyConverter=hparams["readKeyConverter"],
3b4e250d   Chunk   staged.
65
                                            valueConverter=hparams["readValueConverter"],
1c2a3fa0   Chunk   staged.
66
                                            conf=hconf)
8bddd8b3   Chunk   You guess what? T...
67
68
69

        parser = func if func != None else rddparse_data_CV
        hbase_rdd = hbase_rdd.map(lambda x: parser(x))
02528074   Chunk   staged.
70
71

        if collect:
ece71a0d   Chunk   Streaming! encodi...
72
73
            return hbase_rdd.collect()
        else:
8bddd8b3   Chunk   You guess what? T...
74
            """
3b4e250d   Chunk   staged.
75
76
77
            RDD-hbase bug fixed.(with 'repartition()')
            <http://stackoverflow.com/questions/29011574/how-is-spark-partitioning-from-hdfs>

02528074   Chunk   staged.
78
79
80
81
82
83
84
            When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat used to read this file. For instance, if you use textFile() it would be TextInputFormat in Hadoop, which would return you a single partition for a single block of HDFS (but the split between partitions would be done on line split, not the exact block split), unless you have a compressed text file. In case of compressed file you would get a single partition for a single file (as compressed text files are not splittable).
            When you call rdd.repartition(x) it would perform a shuffle of the data from N partititons you have in rdd to x partitions you want to have, partitioning would be done on round robin basis.
            If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) it would be stored in 235 blocks, which means that the RDD you read from this file would have 235 partitions. When you call repartition(1000) your RDD would be marked as to be repartitioned, but in fact it would be shuffled to 1000 partitions only when you will execute an action on top of this RDD (lazy execution concept)

            """
            return hbase_rdd.repartition(parallelism)

ece71a0d   Chunk   Streaming! encodi...
85
86
    def write_hbase(self, table_name, data, fromrdd=False, columns=None, withdata=False):
        """
02528074   Chunk   staged.
87
88
89
90
        Data Format: (Deprecated)
            e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]

        Data(from dictionary):
1c2a3fa0   Chunk   staged.
91
            e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
3b4e250d   Chunk   staged.
92
93
94
95
96
97
98
                cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
        Data(from Rdd):
            e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
                cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
        """
        hconf = {
            "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
489c5608   Chunk   debugging...
99
            # "hbase.zookeeper.quorum": self.host,
3b4e250d   Chunk   staged.
100
101
102
            "hbase.mapreduce.inputtable": table_name,
            "hbase.mapred.outputtable": table_name,
            "mapreduce.outputformat.class": hparams["outputFormatClass"],
1c2a3fa0   Chunk   staged.
103
104
105
106
107
108
109
110
111
112
            "mapreduce.job.output.key.class": hparams["writeKeyClass"],
            "mapreduce.job.output.value.class": hparams["writeValueClass"],
        }
        cols = [col.split(':') for col in columns]
        if not fromrdd:
            rdd_data = self.sc.parallelize(data)
        else:
            rdd_data = data

        rdd_data.flatMap(
3b4e250d   Chunk   staged.
113
114
115
            lambda x: format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset(
            conf=hconf,
            keyConverter=hparams["writeKeyConverter"],
1c2a3fa0   Chunk   staged.
116
            valueConverter=hparams["writeValueConverter"])
3b4e250d   Chunk   staged.
117
118
119
120

    def train_svm(self, X, Y=None):

        if Y == None:
d47ae6ce   Chunk   staged.
121
            # From rdd_labeled
3b4e250d   Chunk   staged.
122
123
124
125
126
127
128
            assert isinstance(X, RDD)
            svm = SVMWithSGD.train(X)
        else:
            # data = []
            # for feat, tag in zip(X, Y):
            # data.append(LabeledPoint(tag, feat))
            # svm = SVMWithSGD.train(self.sc.parallelize(data))
d47ae6ce   Chunk   staged.
129
            hdd_data = self.sc.parallelize(zip(X, Y), 30).map(lambda x: LabeledPoint(x[1], x[0]))
3b4e250d   Chunk   staged.
130
131
132
133
134
135
136
137
            svm = SVMWithSGD.train(hdd_data)
        self.model = svm
        # with open('res/svm_spark.model', 'wb') as modelfile:
        # model = pickle.dump(svm, modelfile)

        return self.model

    def predict_svm(self, x, collect=False, model=None):
1c2a3fa0   Chunk   staged.
138
139
        """
        From pyspark.mlib.classification.py:
3b4e250d   Chunk   staged.
140

ece71a0d   Chunk   Streaming! encodi...
141
142
            >> svm.predict([1.0])
            1
3b4e250d   Chunk   staged.
143
144
145
146
147
148
149
150
151
152
153
            >> svm.predict(sc.parallelize([[1.0]])).collect()
            [1]
            >> svm.clearThreshold()
            >> svm.predict(array([1.0]))
            1.25...
        """
        if model is None:
            if self.model != None:
                model = self.model
            else:
                # with open('res/svm_spark.model', 'rb') as modelfile:
1c2a3fa0   Chunk   staged.
154
                # model = pickle.load(modelfile)
3b4e250d   Chunk   staged.
155
156
157
158
159
160
161
162
                raise Exception("No model available!")

        res = model.predict(x)
        if collect:
            return res.collect()
        else:
            return res

d642d837   Chunk   staged.
163
    def test_svm(self, X, Y=None, model=None):
489c5608   Chunk   debugging...
164
165
166
167
168
169
170
171
172
173
174
175
176
177
        if model is None:
            if self.model != None:
                model = self.model
            else:
                # with open('res/svm_spark.model', 'rb') as modelfile:
                # model = pickle.load(modelfile)
                raise Exception("No model available!")

        if Y == None:
            assert isinstance(X, RDD)
            pass
        else:
            result_Y = np.array(self.predict_svm(X, collect=True))
            return np.mean(Y == result_Y)
51708346   Chunk   final experiments...

489c5608   Chunk   debugging...

ece71a0d   Chunk   Streaming! encodi...

489c5608   Chunk   debugging...

d642d837   Chunk   staged.

489c5608   Chunk   debugging...

3b4e250d   Chunk   staged.

1c2a3fa0   Chunk   staged.

8bddd8b3   Chunk   You guess what? T...

1c2a3fa0   Chunk   staged.

8bddd8b3   Chunk   You guess what? T...

1c2a3fa0   Chunk   staged.

e3ec1f74   Chunk   staged.

4f36b116   Chunk   staged.

e3ec1f74   Chunk   staged.

e3e7e73a   Chunk   spider standalone...

1c2a3fa0   Chunk   staged.

d47ae6ce   Chunk   staged.

0fbc087e   Chunk   staged.

1c2a3fa0   Chunk   staged.

0fbc087e   Chunk   staged.

d47ae6ce   Chunk   staged.

0fbc087e   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

54e2adda   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

e3ec1f74   Chunk   staged.

54e2adda   Chunk   staged.

4f36b116   Chunk   staged.

54e2adda   Chunk   staged.

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

a9c10957   Chunk   hbase-svm & spark...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

f4fb4381   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

489c5608   Chunk   debugging...

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

ea1eb31a   Chunk   spark is privileg...

0a55c5f4   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

ea1eb31a   Chunk   spark is privileg...

d47ae6ce   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

489c5608   Chunk   debugging...

ece71a0d   Chunk   Streaming! encodi...

e3ec1f74   Chunk   staged.

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

0fbc087e   Chunk   staged.

3b4e250d   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

ece71a0d   Chunk   Streaming! encodi...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

10b4f63f   Chunk   staged. Before Pa...

02528074   Chunk   staged.

f4fb4381   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

5ec38adb   Chunk   spark-local of da...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

f20e20ce   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.