__author__ = 'chunk' from ..common import * from .dependencies import * from . import * import sys from pyspark import SparkConf, SparkContext from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD from pyspark.mllib.regression import LabeledPoint from numpy import array import json import pickle def parse_cv(raw_row): """ input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True') return: ([0.056273,...],1) """ data = raw_row[1].split('--%--') feat = json.loads(data[0].split(':')[-1]) tag = 1 if data[-1].split(':')[-1] == 'True' else 0 return (feat, tag) def format_out(row, cols): """ input: e.g. row =('row1',[1,3400,'hello']) cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']] return: [('row1',['row1', 'cf_info', 'id', 1]),('row1',['row1', 'cf_info', 'size', 3400]),('row1',['row1', 'cf_tag', 'desc', 'hello'])] """ puts = [] key = row[0] for data, col in zip(row[1], cols): puts.append((key, [key] + col + [data])) return puts class Sparker(object): def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs): load_env() self.host = host self.appname = appname self.master = kwargs.get('master', 'spark://%s:7077' % self.host) print self.master self.conf = SparkConf() self.conf.setSparkHome(self.host) \ .setMaster(self.master) \ .setAppName(self.appname) # self.conf.set("spark.akka.frameSize","10685760") # self.conf.set("spark.driver.extraClassPath", extraClassPath) \ # .set("spark.executor.extraClassPath", extraClassPath) \ # .set("SPARK_CLASSPATH", extraClassPath) \ # .set("spark.driver.memory", "1G") \ # .set("spark.yarn.jar", sparkJar) self.sc = SparkContext(conf=self.conf) self.model = None def read_hbase(self, table_name, func=None, collect=False): """ ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data Filter format: columns=['cf1:col1', 'cf1:col2'] or columns=['cf1'] """ hconf = {"hbase.zookeeper.quorum": self.host, "hbase.mapreduce.inputtable": table_name, } hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], keyClass=hparams["readKeyClass"], valueClass=hparams["readValueClass"], keyConverter=hparams["readKeyConverter"], valueConverter=hparams["readValueConverter"], conf=hconf) parser = func if func != None else parse_cv hbase_rdd = hbase_rdd.map(lambda x: parser(x)) if collect: return hbase_rdd.collect() else: return hbase_rdd def write_hbase(self, table_name, data, fromrdd=False, columns=None): """ Data Format: (Deprecated) e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]] Data(from dictionary): e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']}, cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc'] Data(from Rdd): e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])], cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc'] """ hconf = {"hbase.zookeeper.quorum": self.host, "hbase.mapreduce.inputtable": table_name, "hbase.mapred.outputtable": table_name, "mapreduce.outputformat.class": hparams["outputFormatClass"], "mapreduce.job.output.key.class": hparams["writeKeyClass"], "mapreduce.job.output.value.class": hparams["writeValueClass"], } cols = [col.split(':') for col in columns] if not fromrdd: rdd_data = self.sc.parallelize(data) rdd_data.flatMap(lambda x: format_out(x, cols)).saveAsNewAPIHadoopDataset( conf=hconf, keyConverter=hparams["writeKeyConverter"], valueConverter=hparams["writeValueConverter"]) def train_svm(self, rdd_labeled): svm = SVMWithSGD.train(rdd_labeled) self.model = svm return svm def train_svm(self, X, Y): # data = [] # for feat, tag in zip(X, Y): # data.append(LabeledPoint(tag, feat)) # svm = SVMWithSGD.train(self.sc.parallelize(data)) hdd_data = self.sc.parallelize(zip(X, Y), 20).map(lambda x: LabeledPoint(x[1], x[0])) svm = SVMWithSGD.train(hdd_data) self.model = svm # with open('res/svm_spark.model', 'wb') as modelfile: # model = pickle.dump(svm, modelfile) return svm def predict_svm(self, x, model=None): if model is None: if self.model != None: model = self.model else: # with open('res/svm_spark.model', 'rb') as modelfile: # model = pickle.load(modelfile) raise Exception("No model available!") return model.predict(x) def test_svm(self, X, Y, model=None): pass