Commit 8bddd8b35aabd5333192647471c7e198890081cf
1 parent
1c2a3fa0
Exists in
master
and in
1 other branch
You guess what? Through all the 'debugs' and 'f**ks' finally we have finished the pyspark&hbase!
Showing
5 changed files
with
22 additions
and
12 deletions
Show diff stats
mdata/ILSVRC_S.py
@@ -361,7 +361,7 @@ class DataILSVRC_S(DataDumperBase): | @@ -361,7 +361,7 @@ class DataILSVRC_S(DataDumperBase): | ||
361 | if readforward: | 361 | if readforward: |
362 | self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False) | 362 | self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False) |
363 | 363 | ||
364 | - rdd_data_ext = self.rdd_data.map(lambda x: SC.rddembed_ILS(x)) | 364 | + rdd_data_ext = self.rdd_data.map(lambda x: SC.rddembed_ILS(x)).filter(lambda x: x != None) |
365 | self.rdd_data = self.rdd_data.union(rdd_data_ext) | 365 | self.rdd_data = self.rdd_data.union(rdd_data_ext) |
366 | 366 | ||
367 | if not writeback: | 367 | if not writeback: |
@@ -454,7 +454,10 @@ class DataILSVRC_S(DataDumperBase): | @@ -454,7 +454,10 @@ class DataILSVRC_S(DataDumperBase): | ||
454 | if readforward: | 454 | if readforward: |
455 | self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False) | 455 | self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False) |
456 | 456 | ||
457 | - self.rdd_data = self.rdd_data.map(lambda x: SC.rddfeat_ILS(x)) | 457 | + self.rdd_data = self.rdd_data.mapValues(lambda items: SC.rddfeat_ILS(items)) |
458 | + | ||
459 | + # print self.rdd_data.collect()[0] | ||
460 | + # return | ||
458 | 461 | ||
459 | if not writeback: | 462 | if not writeback: |
460 | return self.rdd_data | 463 | return self.rdd_data |
mspark/SC.py
@@ -56,9 +56,16 @@ def rddparse_data_ILS(raw_row): | @@ -56,9 +56,16 @@ def rddparse_data_ILS(raw_row): | ||
56 | 56 | ||
57 | 57 | ||
58 | def rddparse_all_ILS(raw_row): | 58 | def rddparse_all_ILS(raw_row): |
59 | + """ | ||
60 | + Deprecated | ||
61 | + """ | ||
59 | key = raw_row[0] | 62 | key = raw_row[0] |
60 | items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--') | 63 | items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--') |
64 | + | ||
65 | + # @TODO | ||
66 | + # N.B "ValueError: No JSON object could be decoded" Because the spark-hbase IO is based on strings. | ||
61 | data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in items[1:]] | 67 | data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in items[1:]] |
68 | + | ||
62 | return (key, data) | 69 | return (key, data) |
63 | 70 | ||
64 | 71 | ||
@@ -145,17 +152,15 @@ def _get_feat(image, feattype='ibd', **kwargs): | @@ -145,17 +152,15 @@ def _get_feat(image, feattype='ibd', **kwargs): | ||
145 | return desc | 152 | return desc |
146 | 153 | ||
147 | 154 | ||
148 | -def rddfeat_ILS(row, feattype='ibd', **kwargs): | ||
149 | - items = row[1] | ||
150 | - capacity, rate, chosen = int(items[4]), float(items[6]), int(items[7]) | 155 | +def rddfeat_ILS(items, feattype='ibd', **kwargs): |
151 | try: | 156 | try: |
152 | tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') | 157 | tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') |
153 | tmpf_src.write(items[0]) | 158 | tmpf_src.write(items[0]) |
154 | tmpf_src.seek(0) | 159 | tmpf_src.seek(0) |
155 | 160 | ||
156 | desc = json.dumps(_get_feat(tmpf_src.name, feattype=feattype).tolist()) | 161 | desc = json.dumps(_get_feat(tmpf_src.name, feattype=feattype).tolist()) |
157 | - | ||
158 | - return (row[0], row[1].append(desc)) | 162 | + # print 'desccccccccccccccccccc',desc |
163 | + return items + [desc] | ||
159 | 164 | ||
160 | except Exception as e: | 165 | except Exception as e: |
161 | print e | 166 | print e |
@@ -174,6 +179,8 @@ def format_out(row, cols): | @@ -174,6 +179,8 @@ def format_out(row, cols): | ||
174 | """ | 179 | """ |
175 | puts = [] | 180 | puts = [] |
176 | key = row[0] | 181 | key = row[0] |
182 | + if key == '04650c488a2b163ca8a1f52da6022f03.jpg': | ||
183 | + print row | ||
177 | for data, col in zip(row[1], cols): | 184 | for data, col in zip(row[1], cols): |
178 | puts.append((key, [key] + col + [str(data)])) | 185 | puts.append((key, [key] + col + [str(data)])) |
179 | return puts | 186 | return puts |
res/toembed
test/test_data.py
@@ -72,9 +72,9 @@ def test_ILSVRC_S(): | @@ -72,9 +72,9 @@ def test_ILSVRC_S(): | ||
72 | 72 | ||
73 | # dils.format() | 73 | # dils.format() |
74 | 74 | ||
75 | - # dils._extract_data(mode='spark', writeback=True) | ||
76 | - dils._embed_data(mode='spark', rate=0.1, readforward=True, writeback=True) | ||
77 | - # dils._extract_feat( mode='spark', feattype='ibd', readforward=False, writeback=False) | 75 | + dils._extract_data(mode='spark', writeback=False) |
76 | + dils._embed_data(mode='spark', rate=0.1, readforward=False, writeback=False) | ||
77 | + dils._extract_feat( mode='spark', feattype='ibd', readforward=False, writeback=True) | ||
78 | 78 | ||
79 | 79 | ||
80 | if __name__ == '__main__': | 80 | if __name__ == '__main__': |