diff --git a/mdata/ILSVRC_S.py b/mdata/ILSVRC_S.py index 3d7216d..6c0000d 100644 --- a/mdata/ILSVRC_S.py +++ b/mdata/ILSVRC_S.py @@ -361,7 +361,7 @@ class DataILSVRC_S(DataDumperBase): if readforward: self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False) - rdd_data_ext = self.rdd_data.map(lambda x: SC.rddembed_ILS(x)) + rdd_data_ext = self.rdd_data.map(lambda x: SC.rddembed_ILS(x)).filter(lambda x: x != None) self.rdd_data = self.rdd_data.union(rdd_data_ext) if not writeback: @@ -454,7 +454,10 @@ class DataILSVRC_S(DataDumperBase): if readforward: self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False) - self.rdd_data = self.rdd_data.map(lambda x: SC.rddfeat_ILS(x)) + self.rdd_data = self.rdd_data.mapValues(lambda items: SC.rddfeat_ILS(items)) + + # print self.rdd_data.collect()[0] + # return if not writeback: return self.rdd_data diff --git a/mspark/SC.py b/mspark/SC.py index 0cef3a5..486d537 100644 --- a/mspark/SC.py +++ b/mspark/SC.py @@ -56,9 +56,16 @@ def rddparse_data_ILS(raw_row): def rddparse_all_ILS(raw_row): + """ + Deprecated + """ key = raw_row[0] items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--') + + # @TODO + # N.B "ValueError: No JSON object could be decoded" Because the spark-hbase IO is based on strings. data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in items[1:]] + return (key, data) @@ -145,17 +152,15 @@ def _get_feat(image, feattype='ibd', **kwargs): return desc -def rddfeat_ILS(row, feattype='ibd', **kwargs): - items = row[1] - capacity, rate, chosen = int(items[4]), float(items[6]), int(items[7]) +def rddfeat_ILS(items, feattype='ibd', **kwargs): try: tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') tmpf_src.write(items[0]) tmpf_src.seek(0) desc = json.dumps(_get_feat(tmpf_src.name, feattype=feattype).tolist()) - - return (row[0], row[1].append(desc)) + # print 'desccccccccccccccccccc',desc + return items + [desc] except Exception as e: print e @@ -174,6 +179,8 @@ def format_out(row, cols): """ puts = [] key = row[0] + if key == '04650c488a2b163ca8a1f52da6022f03.jpg': + print row for data, col in zip(row[1], cols): puts.append((key, [key] + col + [str(data)])) return puts diff --git a/res/toembed b/res/toembed deleted file mode 100644 index 683f76e..0000000 --- a/res/toembed +++ /dev/null @@ -1,2 +0,0 @@ -3.段描述符是GDT和LDT表中的一个数据结构项,用于向处理器提供有关一个段的位置和大小信息以及访问控制的状态信息。每个段描述符的长度是8字节,含有3个主要字段:段基地址、段限长和段属性。根据段类型中的段扩展方向标志E,处理器以两种不同方式使用段限长Limit。对于向上扩展的段(简称上扩段),逻辑地址中的偏移值范围可以从0到段限长值Limit。大于段限长Limit的偏移值将产生一般保护性异常。对于向下扩展的段(简称下扩段),段限长Limit的含义相反。 -(段长度为什么是20位呢?4GB) diff --git a/res/toembed.bak b/res/toembed.bak new file mode 100644 index 0000000..683f76e --- /dev/null +++ b/res/toembed.bak @@ -0,0 +1,2 @@ +3.段描述符是GDT和LDT表中的一个数据结构项,用于向处理器提供有关一个段的位置和大小信息以及访问控制的状态信息。每个段描述符的长度是8字节,含有3个主要字段:段基地址、段限长和段属性。根据段类型中的段扩展方向标志E,处理器以两种不同方式使用段限长Limit。对于向上扩展的段(简称上扩段),逻辑地址中的偏移值范围可以从0到段限长值Limit。大于段限长Limit的偏移值将产生一般保护性异常。对于向下扩展的段(简称下扩段),段限长Limit的含义相反。 +(段长度为什么是20位呢?4GB) diff --git a/test/test_data.py b/test/test_data.py index 8e32ca6..83dc610 100755 --- a/test/test_data.py +++ b/test/test_data.py @@ -72,9 +72,9 @@ def test_ILSVRC_S(): # dils.format() - # dils._extract_data(mode='spark', writeback=True) - dils._embed_data(mode='spark', rate=0.1, readforward=True, writeback=True) - # dils._extract_feat( mode='spark', feattype='ibd', readforward=False, writeback=False) + dils._extract_data(mode='spark', writeback=False) + dils._embed_data(mode='spark', rate=0.1, readforward=False, writeback=False) + dils._extract_feat( mode='spark', feattype='ibd', readforward=False, writeback=True) if __name__ == '__main__': -- libgit2 0.21.2