From 5c9c44da22fd65b593e5b7b7cf5d788c5f11a930 Mon Sep 17 00:00:00 2001 From: Chunk Date: Thu, 16 Apr 2015 21:29:58 +0800 Subject: [PATCH] staged. --- test/test_whole.py | 56 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/test/test_whole.py b/test/test_whole.py index 6146376..c3d1ae0 100644 --- a/test/test_whole.py +++ b/test/test_whole.py @@ -4,6 +4,7 @@ from ..mspark import SC from pyspark.mllib.regression import LabeledPoint import happybase + def test_whole(): cols0 = [ 'cf_pic:data', @@ -33,7 +34,7 @@ def test_whole(): # rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \ # .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \ - # .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \ + # .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \ # .mapValues(lambda items: SC.rddfeat_ILS(items)) rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False).mapValues( @@ -64,36 +65,37 @@ def test_whole_ext(): cols = ['cf_pic:data'] list_data = [] for key, data in table.scan(columns=cols): - data = data['cf_pic:data'] - list_data.append((key,data)) + data = data['cf_pic:data'] + list_data.append((key, data)) sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') - rdd_data = sparker.sc.parallelize(list_data,20)\ - .mapValues(lambda data: [data] + SC.rddinfo_ILS(data))\ - .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2))\ + rdd_data = sparker.sc.parallelize(list_data, 40) \ + .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \ + .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \ .mapValues(lambda items: SC.rddfeat_ILS(items)) - rrr = rdd_data.collect() - print "-----------------",len(rrr),"====================" - print "+++++++++++++++++",rrr[0],"**********************" - # try: - # with table.batch(batch_size=5000) as b: - # for imgname, imginfo in rdd_data.collect().items(): - # b.put(imgname, - # { - # 'cf_pic:data': imginfo[0], - # 'cf_info:width': str(imginfo[1]), - # 'cf_info:height': str(imginfo[2]), - # 'cf_info:size': str(imginfo[3]), - # 'cf_info:capacity': str(imginfo[4]), - # 'cf_info:quality': str(imginfo[5]), - # 'cf_info:rate': str(imginfo[6]), - # 'cf_tag:chosen': str(imginfo[7]), - # 'cf_tag:class': str(imginfo[8]), - # 'cf_feat:' + feattype: imginfo[9], - # }) - # except ValueError: - # raise + # rrr = rdd_data.collect() + # print "-----------------", len(rrr), "====================" + # print "+++++++++++++++++", rrr[0], "**********************" + try: + with table.batch(batch_size=5000) as b: + for item in rdd_data.collect(): + imgname, imginfo = item[0], item[1] + b.put(imgname, + { + 'cf_pic:data': imginfo[0], + 'cf_info:width': str(imginfo[1]), + 'cf_info:height': str(imginfo[2]), + 'cf_info:size': str(imginfo[3]), + 'cf_info:capacity': str(imginfo[4]), + 'cf_info:quality': str(imginfo[5]), + 'cf_info:rate': str(imginfo[6]), + 'cf_tag:chosen': str(imginfo[7]), + 'cf_tag:class': str(imginfo[8]), + 'cf_feat:ibd' : imginfo[9], + }) + except ValueError: + raise -- libgit2 0.21.2