From 54e2adda230421077d556c3d775d9f399b82652e Mon Sep 17 00:00:00 2001 From: Chunk Date: Thu, 16 Apr 2015 21:15:15 +0800 Subject: [PATCH] staged. --- mspark/SC.py | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- test/test_data.py | 14 +++++++------- test/test_whole.py | 134 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------ 3 files changed, 179 insertions(+), 58 deletions(-) diff --git a/mspark/SC.py b/mspark/SC.py index 42de018..03ab128 100644 --- a/mspark/SC.py +++ b/mspark/SC.py @@ -247,6 +247,77 @@ def format_out(row, cols, withdata=False): puts.append((key, [key] + col + [str(data)])) return puts +# scconf = SparkConf() +# scconf.setSparkHome("HPC-server") \ +# .setMaster("spark://HPC-server:7077") \ +# .setAppName("example") +# sc = SparkContext(conf=scconf) +# +# +# def read_hbase(table_name, func=None, collect=False): +# """ +# ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data +# +# Filter format: +# columns=['cf1:col1', 'cf1:col2'] +# or +# columns=['cf1'] +# +# """ +# +# hconf = { +# "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", +# # "hbase.zookeeper.quorum": self.host, +# "hbase.mapreduce.inputtable": table_name, +# } +# +# hbase_rdd = sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], +# keyClass=hparams["readKeyClass"], +# valueClass=hparams["readValueClass"], +# keyConverter=hparams["readKeyConverter"], +# valueConverter=hparams["readValueConverter"], +# conf=hconf) +# +# parser = func if func != None else rddparse_data_CV +# hbase_rdd = hbase_rdd.map(lambda x: parser(x)) +# +# if collect: +# return hbase_rdd.collect() +# else: +# return hbase_rdd +# +# +# def write_hbase(table_name, data, fromrdd=False, columns=None, withdata=False): +# """ +# Data Format: (Deprecated) +# e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]] +# +# Data(from dictionary): +# e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']}, +# cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc'] +# Data(from Rdd): +# e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])], +# cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc'] +# """ +# hconf = { +# "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", # "hbase.zookeeper.quorum": self.host, +# "hbase.mapreduce.inputtable": table_name, +# "hbase.mapred.outputtable": table_name, +# "mapreduce.outputformat.class": hparams["outputFormatClass"], +# "mapreduce.job.output.key.class": hparams["writeKeyClass"], +# "mapreduce.job.output.value.class": hparams["writeValueClass"], +# } +# cols = [col.split(':') for col in columns] +# if not fromrdd: +# rdd_data = sc.parallelize(data) +# else: +# rdd_data = data +# +# rdd_data.flatMap(lambda x: format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset( +# conf=hconf, +# keyConverter=hparams["writeKeyConverter"], +# valueConverter=hparams["writeValueConverter"]) + class Sparker(object): def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs): @@ -283,9 +354,9 @@ class Sparker(object): """ hconf = { - "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", - # "hbase.zookeeper.quorum": self.host, - "hbase.mapreduce.inputtable": table_name, + "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", + # "hbase.zookeeper.quorum": self.host, + "hbase.mapreduce.inputtable": table_name, } hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], @@ -316,12 +387,12 @@ class Sparker(object): cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc'] """ hconf = { - "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", # "hbase.zookeeper.quorum": self.host, - "hbase.mapreduce.inputtable": table_name, - "hbase.mapred.outputtable": table_name, - "mapreduce.outputformat.class": hparams["outputFormatClass"], - "mapreduce.job.output.key.class": hparams["writeKeyClass"], - "mapreduce.job.output.value.class": hparams["writeValueClass"], + "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", # "hbase.zookeeper.quorum": self.host, + "hbase.mapreduce.inputtable": table_name, + "hbase.mapred.outputtable": table_name, + "mapreduce.outputformat.class": hparams["outputFormatClass"], + "mapreduce.job.output.key.class": hparams["writeKeyClass"], + "mapreduce.job.output.value.class": hparams["writeValueClass"], } cols = [col.split(':') for col in columns] if not fromrdd: diff --git a/test/test_data.py b/test/test_data.py index c02a004..4a48d30 100755 --- a/test/test_data.py +++ b/test/test_data.py @@ -93,13 +93,13 @@ def test_ILSVRC_S_LOCAL(): def test_ILSVRC_S_SPARK(): timer = Timer() - # timer.mark() - # dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Test_1') - # dil.delete_table() - # dil.format() - # dil.store_img() - # timer.report() - # return + timer.mark() + dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Test_1') + dil.delete_table() + dil.format() + dil.store_img() + timer.report() + return dils = ILSVRC_S.DataILSVRC_S(base='ILSVRC2013_DET_val', category='Test_1') diff --git a/test/test_whole.py b/test/test_whole.py index 88eff32..6146376 100644 --- a/test/test_whole.py +++ b/test/test_whole.py @@ -2,48 +2,98 @@ __author__ = 'chunk' from ..mspark import SC from pyspark.mllib.regression import LabeledPoint - - -cols0 = [ - 'cf_pic:data', - 'cf_info:width', - 'cf_info:height', - 'cf_info:size', - 'cf_info:capacity', - 'cf_info:quality', - 'cf_info:rate', - 'cf_tag:chosen', - 'cf_tag:class' -] -cols1 = [ - 'cf_pic:data', - 'cf_info:width', - 'cf_info:height', - 'cf_info:size', - 'cf_info:capacity', - 'cf_info:quality', - 'cf_info:rate', - 'cf_tag:chosen', - 'cf_tag:class', - 'cf_feat:bid', -] - -sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') - -rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \ - .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \ - .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \ - .mapValues(lambda items: SC.rddfeat_ILS(items)) - -sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1, - withdata=True) - - - - - - - +import happybase + +def test_whole(): + cols0 = [ + 'cf_pic:data', + 'cf_info:width', + 'cf_info:height', + 'cf_info:size', + 'cf_info:capacity', + 'cf_info:quality', + 'cf_info:rate', + 'cf_tag:chosen', + 'cf_tag:class' + ] + cols1 = [ + 'cf_pic:data', + 'cf_info:width', + 'cf_info:height', + 'cf_info:size', + 'cf_info:capacity', + 'cf_info:quality', + 'cf_info:rate', + 'cf_tag:chosen', + 'cf_tag:class', + 'cf_feat:bid', + ] + + sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') + + # rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \ + # .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \ + # .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \ + # .mapValues(lambda items: SC.rddfeat_ILS(items)) + + rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False).mapValues( + lambda data: [data] + SC.rddinfo_ILS(data)) + rdd_data_ext = rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=0.2)).filter(lambda x: x != None) + + rdd_data = rdd_data.union(rdd_data_ext).mapValues(lambda items: SC.rddfeat_ILS(items)) + + print len(rdd_data.collect()) + + # sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1, + # withdata=True) + + +def test_whole_ext(): + table_name = "ILSVRC2013_DET_val-Test_1" + connection = happybase.Connection('HPC-server') + tables = connection.tables() + if table_name not in tables: + families = {'cf_pic': dict(), + 'cf_info': dict(max_versions=10), + 'cf_tag': dict(), + 'cf_feat': dict(), + } + connection.create_table(name=table_name, families=families) + table = connection.table(name=table_name) + + cols = ['cf_pic:data'] + list_data = [] + for key, data in table.scan(columns=cols): + data = data['cf_pic:data'] + list_data.append((key,data)) + + sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') + rdd_data = sparker.sc.parallelize(list_data,20)\ + .mapValues(lambda data: [data] + SC.rddinfo_ILS(data))\ + .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2))\ + .mapValues(lambda items: SC.rddfeat_ILS(items)) + + rrr = rdd_data.collect() + print "-----------------",len(rrr),"====================" + print "+++++++++++++++++",rrr[0],"**********************" + # try: + # with table.batch(batch_size=5000) as b: + # for imgname, imginfo in rdd_data.collect().items(): + # b.put(imgname, + # { + # 'cf_pic:data': imginfo[0], + # 'cf_info:width': str(imginfo[1]), + # 'cf_info:height': str(imginfo[2]), + # 'cf_info:size': str(imginfo[3]), + # 'cf_info:capacity': str(imginfo[4]), + # 'cf_info:quality': str(imginfo[5]), + # 'cf_info:rate': str(imginfo[6]), + # 'cf_tag:chosen': str(imginfo[7]), + # 'cf_tag:class': str(imginfo[8]), + # 'cf_feat:' + feattype: imginfo[9], + # }) + # except ValueError: + # raise -- libgit2 0.21.2