From d642d837fa065c133187e9c8f8dbc0283103bfe6 Mon Sep 17 00:00:00 2001 From: Chunk Date: Thu, 16 Apr 2015 14:02:09 +0800 Subject: [PATCH] staged. --- mspark/SC.py | 26 +++++++++++++------------- test/test_whole.py | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 13 deletions(-) create mode 100644 test/test_whole.py diff --git a/mspark/SC.py b/mspark/SC.py index a8f6f9f..42de018 100644 --- a/mspark/SC.py +++ b/mspark/SC.py @@ -155,6 +155,7 @@ def rddembed_ILS(row, rate=None): tmpf_src.close() tmpf_dst.close() + def rddembed_ILS_EXT(row, rate=None): """ input: @@ -188,7 +189,7 @@ def rddembed_ILS_EXT(row, rate=None): raw = tmpf_dst.read() index = md5(raw).hexdigest() - return [row,(index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))] + return [row, (index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))] except Exception as e: print e @@ -282,10 +283,10 @@ class Sparker(object): """ hconf = { - "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", - #"hbase.zookeeper.quorum": self.host, - "hbase.mapreduce.inputtable": table_name, - } + "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", + # "hbase.zookeeper.quorum": self.host, + "hbase.mapreduce.inputtable": table_name, + } hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], keyClass=hparams["readKeyClass"], @@ -315,14 +316,13 @@ class Sparker(object): cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc'] """ hconf = { - "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", - #"hbase.zookeeper.quorum": self.host, - "hbase.mapreduce.inputtable": table_name, - "hbase.mapred.outputtable": table_name, - "mapreduce.outputformat.class": hparams["outputFormatClass"], - "mapreduce.job.output.key.class": hparams["writeKeyClass"], - "mapreduce.job.output.value.class": hparams["writeValueClass"], - } + "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", # "hbase.zookeeper.quorum": self.host, + "hbase.mapreduce.inputtable": table_name, + "hbase.mapred.outputtable": table_name, + "mapreduce.outputformat.class": hparams["outputFormatClass"], + "mapreduce.job.output.key.class": hparams["writeKeyClass"], + "mapreduce.job.output.value.class": hparams["writeValueClass"], + } cols = [col.split(':') for col in columns] if not fromrdd: rdd_data = self.sc.parallelize(data) diff --git a/test/test_whole.py b/test/test_whole.py new file mode 100644 index 0000000..88eff32 --- /dev/null +++ b/test/test_whole.py @@ -0,0 +1,56 @@ +__author__ = 'chunk' + +from ..mspark import SC +from pyspark.mllib.regression import LabeledPoint + + +cols0 = [ + 'cf_pic:data', + 'cf_info:width', + 'cf_info:height', + 'cf_info:size', + 'cf_info:capacity', + 'cf_info:quality', + 'cf_info:rate', + 'cf_tag:chosen', + 'cf_tag:class' +] +cols1 = [ + 'cf_pic:data', + 'cf_info:width', + 'cf_info:height', + 'cf_info:size', + 'cf_info:capacity', + 'cf_info:quality', + 'cf_info:rate', + 'cf_tag:chosen', + 'cf_tag:class', + 'cf_feat:bid', +] + +sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') + +rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \ + .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \ + .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \ + .mapValues(lambda items: SC.rddfeat_ILS(items)) + +sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1, + withdata=True) + + + + + + + + + + + + + + + + + -- libgit2 0.21.2