__author__ = 'chunk' import sys from common import * from dependencies import * from pyspark import SparkConf, SparkContext from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD from pyspark.mllib.regression import LabeledPoint from numpy import array hparams = dict( inputFormatClass="org.apache.hadoop.hbase.mapreduce.TableInputFormat", readKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", readValueClass="org.apache.hadoop.hbase.client.Result", readKeyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter", readValueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter", outputFormatClass="org.apache.hadoop.hbase.mapreduce.TableOutputFormat", writeKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", # writeValueClass="org.apache.hadoop.io.Writable", writeValueClass="org.apache.hadoop.hbase.client.Put", writeKeyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", writeValueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter", ) class Sparker(object): def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs): load_env() self.host = host self.appname = appname self.master = kwargs.get('master', 'spark://%s:7077' % self.appname) self.conf = SparkConf() self.conf.setSparkHome(self.host). \ setMaster(self.master). \ setAppName(self.appname) self.conf.set("spark.driver.extraClassPath", extraClassPath) \ .set("spark.executor.extraClassPath", extraClassPath) \ .set("SPARK_CLASSPATH", extraClassPath) \ .set("spark.driver.memory", "1G") \ .set("spark.yarn.jar", sparkJar) self.sc = SparkContext(self.conf) self.model = None def read_habase(self, table_name, columns=None): """ ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data Filter format: columns=['cf1:col1', 'cf1:col2'] or columns=['cf1'] """ hconf = {"hbase.zookeeper.quorum": self.host, "hbase.mapreduce.inputtable": table_name, } hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], keyClass=hparams["readKeyClass"], valueClass=hparams["readValueClass"], keyConverter=hparams["readKeyConverter"], valueConverter=hparams["readValueConverter"], conf=hconf) output = hbase_rdd.collect() return output def write_habase(self, table_name, data): """ Data Format: e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]] """ hconf = {"hbase.zookeeper.quorum": self.host, "hbase.mapreduce.inputtable": table_name, "hbase.mapred.outputtable": table_name, "mapreduce.outputformat.class": hparams["outputFormatClass"], "mapreduce.job.output.key.class": hparams["writeKeyClass"], "mapreduce.job.output.value.class": hparams["writeValueClass"], } self.sc.parallelize(data).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( conf=hconf, keyConverter=hparams["writeKeyConverter"], valueConverter=hparams["writeValueConverter"]) def train_svm(self, rdd_labeled): svm = SVMWithSGD.train(rdd_labeled) self.model = svm return svm def train_svm(self, X, Y): data = zip(X, Y).map(LabeledPoint) svm = SVMWithSGD.train(self.sc.parallelize(data)) self.model = svm return svm def predict_svm(self, x, model=None): if model is None: if self.model != None: model = self.model else: raise Exception("No Model available!") return self.model.predict(x)