Blame view

mspark/SC.py 4.16 KB
3b4e250d   Chunk   staged.
1
__author__ = 'chunk'
a9c10957   Chunk   hbase-svm & spark...
2
3

import sys
ca73c96f   Chunk   Transformed into ...
4
from common import *
f69baeb6   Chunk   spark streaming ...
5
6
from dependencies import *
from pyspark import SparkConf, SparkContext
3b4e250d   Chunk   staged.
7
8
9
10
11
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array

hparams = dict(
1c2a3fa0   Chunk   staged.
12
    inputFormatClass="org.apache.hadoop.hbase.mapreduce.TableInputFormat",
e3e7e73a   Chunk   spider standalone...
13
    readKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable",
ca73c96f   Chunk   Transformed into ...
14
    readValueClass="org.apache.hadoop.hbase.client.Result",
a9c10957   Chunk   hbase-svm & spark...
15
    readKeyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
02528074   Chunk   staged.
16
    readValueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
a9c10957   Chunk   hbase-svm & spark...
17
18
19
20

    outputFormatClass="org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
    writeKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable",
    # writeValueClass="org.apache.hadoop.io.Writable",
5ec38adb   Chunk   spark-local of da...
21
22
    writeValueClass="org.apache.hadoop.hbase.client.Put",
    writeKeyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
3b4e250d   Chunk   staged.
23
24
25
    writeValueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter",

)
3b4e250d   Chunk   staged.
26
27
28


class Sparker(object):
1c2a3fa0   Chunk   staged.
29
    def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
3b4e250d   Chunk   staged.
30
        load_env()
e3e7e73a   Chunk   spider standalone...
31
        self.host = host
3b4e250d   Chunk   staged.
32
        self.appname = appname
e3ec1f74   Chunk   staged.
33
        self.master = kwargs.get('master', 'spark://%s:7077' % self.appname)
3b4e250d   Chunk   staged.
34

5ec38adb   Chunk   spark-local of da...
35
36
37
38
39
40
41
42
43
44
        self.conf = SparkConf()
        self.conf.setSparkHome(self.host). \
            setMaster(self.master). \
            setAppName(self.appname)
        self.conf.set("spark.driver.extraClassPath", extraClassPath) \
            .set("spark.executor.extraClassPath", extraClassPath) \
            .set("SPARK_CLASSPATH", extraClassPath) \
            .set("spark.driver.memory", "1G") \
            .set("spark.yarn.jar", sparkJar)

3b4e250d   Chunk   staged.
45
46
47
48
49
50
51
        self.sc = SparkContext(self.conf)

        self.model = None

    def read_habase(self, table_name, columns=None):
        """
        ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
3b4e250d   Chunk   staged.
52

1c2a3fa0   Chunk   staged.
53
54
55
56
        Filter format:
            columns=['cf1:col1', 'cf1:col2']
            or
            columns=['cf1']
3b4e250d   Chunk   staged.
57
58
59
60
61

        """
        hconf = {"hbase.zookeeper.quorum": self.host,
                 "hbase.mapreduce.inputtable": table_name,
        }
8bddd8b3   Chunk   You guess what? T...
62
63
64

        hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
                                            keyClass=hparams["readKeyClass"],
3b4e250d   Chunk   staged.
65
                                            valueClass=hparams["readValueClass"],
1c2a3fa0   Chunk   staged.
66
                                            keyConverter=hparams["readKeyConverter"],
8bddd8b3   Chunk   You guess what? T...
67
68
69
                                            valueConverter=hparams["readValueConverter"],
                                            conf=hconf)

02528074   Chunk   staged.
70
71
        output = hbase_rdd.collect()
        return output
ece71a0d   Chunk   Streaming! encodi...
72
73

    def write_habase(self, table_name, data):
8bddd8b3   Chunk   You guess what? T...
74
        """
3b4e250d   Chunk   staged.
75
76
77
        Data Format:
            e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
        """
02528074   Chunk   staged.
78
79
80
81
82
83
84
        hconf = {"hbase.zookeeper.quorum": self.host,
                 "hbase.mapreduce.inputtable": table_name,
                 "hbase.mapred.outputtable": table_name,
                 "mapreduce.outputformat.class": hparams["outputFormatClass"],
                 "mapreduce.job.output.key.class": hparams["writeKeyClass"],
                 "mapreduce.job.output.value.class": hparams["writeValueClass"],
        }
ece71a0d   Chunk   Streaming! encodi...
85
86

        self.sc.parallelize(data).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
02528074   Chunk   staged.
87
88
89
90
            conf=hconf,
            keyConverter=hparams["writeKeyConverter"],
            valueConverter=hparams["writeValueConverter"])

1c2a3fa0   Chunk   staged.
91

3b4e250d   Chunk   staged.
92
93
94
95
96
97
98
    def train_svm(self, rdd_labeled):
        svm = SVMWithSGD.train(rdd_labeled)
        self.model = svm

        return svm

    def train_svm(self, X, Y):
489c5608   Chunk   debugging...
99
        data = zip(X, Y).map(LabeledPoint)
3b4e250d   Chunk   staged.
100
101
102
        svm = SVMWithSGD.train(self.sc.parallelize(data))
        self.model = svm

1c2a3fa0   Chunk   staged.
103
104
105
106
107
108
109
110
111
112
        return svm

    def predict_svm(self, x, model=None):
        if model is None:
            if self.model != None:
                model = self.model
            else:
                raise Exception("No Model available!")

        return self.model.predict(x)
3b4e250d   Chunk   staged.

1c2a3fa0   Chunk   staged.

3b4e250d   Chunk   staged.

d47ae6ce   Chunk   staged.

3b4e250d   Chunk   staged.

d47ae6ce   Chunk   staged.

3b4e250d   Chunk   staged.

1c2a3fa0   Chunk   staged.

3b4e250d   Chunk   staged.

ece71a0d   Chunk   Streaming! encodi...

3b4e250d   Chunk   staged.

1c2a3fa0   Chunk   staged.

3b4e250d   Chunk   staged.

d642d837   Chunk   staged.

489c5608   Chunk   debugging...

51708346   Chunk   final experiments...

489c5608   Chunk   debugging...

ece71a0d   Chunk   Streaming! encodi...

489c5608   Chunk   debugging...

d642d837   Chunk   staged.

489c5608   Chunk   debugging...

3b4e250d   Chunk   staged.

1c2a3fa0   Chunk   staged.

8bddd8b3   Chunk   You guess what? T...

1c2a3fa0   Chunk   staged.

8bddd8b3   Chunk   You guess what? T...

1c2a3fa0   Chunk   staged.

e3ec1f74   Chunk   staged.

4f36b116   Chunk   staged.

e3ec1f74   Chunk   staged.

e3e7e73a   Chunk   spider standalone...

1c2a3fa0   Chunk   staged.

d47ae6ce   Chunk   staged.

0fbc087e   Chunk   staged.

1c2a3fa0   Chunk   staged.

0fbc087e   Chunk   staged.

d47ae6ce   Chunk   staged.

0fbc087e   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

54e2adda   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

e3ec1f74   Chunk   staged.

54e2adda   Chunk   staged.

4f36b116   Chunk   staged.

54e2adda   Chunk   staged.

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

a9c10957   Chunk   hbase-svm & spark...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

f4fb4381   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

489c5608   Chunk   debugging...

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

ea1eb31a   Chunk   spark is privileg...

0a55c5f4   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

ea1eb31a   Chunk   spark is privileg...

d47ae6ce   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

489c5608   Chunk   debugging...

ece71a0d   Chunk   Streaming! encodi...

e3ec1f74   Chunk   staged.

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

0fbc087e   Chunk   staged.

3b4e250d   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

ece71a0d   Chunk   Streaming! encodi...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

10b4f63f   Chunk   staged. Before Pa...

02528074   Chunk   staged.

f4fb4381   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

5ec38adb   Chunk   spark-local of da...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

f20e20ce   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.