Commit 5c9c44da22fd65b593e5b7b7cf5d788c5f11a930
1 parent
54e2adda
Exists in
master
and in
1 other branch
staged.
Showing
1 changed file
with
29 additions
and
27 deletions
Show diff stats
test/test_whole.py
... | ... | @@ -4,6 +4,7 @@ from ..mspark import SC |
4 | 4 | from pyspark.mllib.regression import LabeledPoint |
5 | 5 | import happybase |
6 | 6 | |
7 | + | |
7 | 8 | def test_whole(): |
8 | 9 | cols0 = [ |
9 | 10 | 'cf_pic:data', |
... | ... | @@ -33,7 +34,7 @@ def test_whole(): |
33 | 34 | |
34 | 35 | # rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \ |
35 | 36 | # .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \ |
36 | - # .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \ | |
37 | + # .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \ | |
37 | 38 | # .mapValues(lambda items: SC.rddfeat_ILS(items)) |
38 | 39 | |
39 | 40 | rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False).mapValues( |
... | ... | @@ -64,36 +65,37 @@ def test_whole_ext(): |
64 | 65 | cols = ['cf_pic:data'] |
65 | 66 | list_data = [] |
66 | 67 | for key, data in table.scan(columns=cols): |
67 | - data = data['cf_pic:data'] | |
68 | - list_data.append((key,data)) | |
68 | + data = data['cf_pic:data'] | |
69 | + list_data.append((key, data)) | |
69 | 70 | |
70 | 71 | sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') |
71 | - rdd_data = sparker.sc.parallelize(list_data,20)\ | |
72 | - .mapValues(lambda data: [data] + SC.rddinfo_ILS(data))\ | |
73 | - .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2))\ | |
72 | + rdd_data = sparker.sc.parallelize(list_data, 40) \ | |
73 | + .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \ | |
74 | + .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \ | |
74 | 75 | .mapValues(lambda items: SC.rddfeat_ILS(items)) |
75 | 76 | |
76 | - rrr = rdd_data.collect() | |
77 | - print "-----------------",len(rrr),"====================" | |
78 | - print "+++++++++++++++++",rrr[0],"**********************" | |
79 | - # try: | |
80 | - # with table.batch(batch_size=5000) as b: | |
81 | - # for imgname, imginfo in rdd_data.collect().items(): | |
82 | - # b.put(imgname, | |
83 | - # { | |
84 | - # 'cf_pic:data': imginfo[0], | |
85 | - # 'cf_info:width': str(imginfo[1]), | |
86 | - # 'cf_info:height': str(imginfo[2]), | |
87 | - # 'cf_info:size': str(imginfo[3]), | |
88 | - # 'cf_info:capacity': str(imginfo[4]), | |
89 | - # 'cf_info:quality': str(imginfo[5]), | |
90 | - # 'cf_info:rate': str(imginfo[6]), | |
91 | - # 'cf_tag:chosen': str(imginfo[7]), | |
92 | - # 'cf_tag:class': str(imginfo[8]), | |
93 | - # 'cf_feat:' + feattype: imginfo[9], | |
94 | - # }) | |
95 | - # except ValueError: | |
96 | - # raise | |
77 | + # rrr = rdd_data.collect() | |
78 | + # print "-----------------", len(rrr), "====================" | |
79 | + # print "+++++++++++++++++", rrr[0], "**********************" | |
80 | + try: | |
81 | + with table.batch(batch_size=5000) as b: | |
82 | + for item in rdd_data.collect(): | |
83 | + imgname, imginfo = item[0], item[1] | |
84 | + b.put(imgname, | |
85 | + { | |
86 | + 'cf_pic:data': imginfo[0], | |
87 | + 'cf_info:width': str(imginfo[1]), | |
88 | + 'cf_info:height': str(imginfo[2]), | |
89 | + 'cf_info:size': str(imginfo[3]), | |
90 | + 'cf_info:capacity': str(imginfo[4]), | |
91 | + 'cf_info:quality': str(imginfo[5]), | |
92 | + 'cf_info:rate': str(imginfo[6]), | |
93 | + 'cf_tag:chosen': str(imginfo[7]), | |
94 | + 'cf_tag:class': str(imginfo[8]), | |
95 | + 'cf_feat:ibd' : imginfo[9], | |
96 | + }) | |
97 | + except ValueError: | |
98 | + raise | |
97 | 99 | |
98 | 100 | |
99 | 101 | ... | ... |