diff --git a/mdata/ILSVRC.py b/mdata/ILSVRC.py index fbf2fdd..066a72b 100644 --- a/mdata/ILSVRC.py +++ b/mdata/ILSVRC.py @@ -291,6 +291,24 @@ class DataILSVRC(DataDumperBase): return table + def delete_table(self, table_name=None, disable=True): + if table_name == None: + table_name = self.table_name + + if self.connection is None: + c = happybase.Connection('HPC-server') + self.connection = c + + tables = self.connection.tables() + if table_name not in tables: + return False + else: + try: + self.connection.delete_table(table_name, disable) + except: + print 'Exception when deleting table.' + raise + return True def store_img(self): if self.table == None: diff --git a/mdata/ILSVRC_S.py b/mdata/ILSVRC_S.py index 6c0000d..2a2bb6c 100644 --- a/mdata/ILSVRC_S.py +++ b/mdata/ILSVRC_S.py @@ -81,6 +81,25 @@ class DataILSVRC_S(DataDumperBase): return table + def delete_table(self, table_name=None, disable=True): + if table_name == None: + table_name = self.table_name + + if self.connection is None: + c = happybase.Connection('HPC-server') + self.connection = c + + tables = self.connection.tables() + if table_name not in tables: + return False + else: + try: + self.connection.delete_table(table_name, disable) + except: + print 'Exception when deleting table.' + raise + return True + def _get_info(self, img, info_rate=None, tag_chosen=None, tag_class=None): """ Tempfile is our friend. (?) @@ -256,7 +275,7 @@ class DataILSVRC_S(DataDumperBase): if not writeback: return self.rdd_data else: - self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols) + self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols, withdata=True) else: raise Exception("Unknown mode!") @@ -361,13 +380,13 @@ class DataILSVRC_S(DataDumperBase): if readforward: self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False) - rdd_data_ext = self.rdd_data.map(lambda x: SC.rddembed_ILS(x)).filter(lambda x: x != None) + rdd_data_ext = self.rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=rate)).filter(lambda x: x != None) self.rdd_data = self.rdd_data.union(rdd_data_ext) if not writeback: return self.rdd_data else: - self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols) + self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols, withdata=True) else: raise Exception("Unknown mode!") @@ -462,7 +481,7 @@ class DataILSVRC_S(DataDumperBase): if not writeback: return self.rdd_data else: - self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols) + self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols, withdata=False) else: diff --git a/mspark/SC.py b/mspark/SC.py index 486d537..04f5749 100644 --- a/mspark/SC.py +++ b/mspark/SC.py @@ -99,7 +99,7 @@ def rddinfo_ILS(img, info_rate=None, tag_chosen=None, tag_class=None): tmpf.close() -def rddembed_ILS(row): +def rddembed_ILS(row, rate=None): """ input: e.g. row =('row1',[1,3400,'hello']) @@ -107,7 +107,7 @@ def rddembed_ILS(row): newrow = ('row2',[34,5400,'embeded']) """ items = row[1] - capacity, rate, chosen = int(items[4]), float(items[6]), int(items[7]) + capacity, chosen = int(items[4]), int(items[7]) if chosen == 0: return None try: @@ -169,7 +169,7 @@ def rddfeat_ILS(items, feattype='ibd', **kwargs): tmpf_src.close() -def format_out(row, cols): +def format_out(row, cols, withdata=False): """ input: e.g. row =('row1',[1,3400,'hello']) @@ -179,10 +179,14 @@ def format_out(row, cols): """ puts = [] key = row[0] - if key == '04650c488a2b163ca8a1f52da6022f03.jpg': - print row - for data, col in zip(row[1], cols): - puts.append((key, [key] + col + [str(data)])) + # if key == '04650c488a2b163ca8a1f52da6022f03.jpg': + # print row + if not withdata: + for data, col in zip(row[1][1:], cols[1:]): + puts.append((key, [key] + col + [str(data)])) + else: + for data, col in zip(row[1], cols): + puts.append((key, [key] + col + [str(data)])) return puts @@ -239,7 +243,7 @@ class Sparker(object): else: return hbase_rdd - def write_hbase(self, table_name, data, fromrdd=False, columns=None): + def write_hbase(self, table_name, data, fromrdd=False, columns=None, withdata=False): """ Data Format: (Deprecated) e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]] @@ -264,7 +268,7 @@ class Sparker(object): else: rdd_data = data - rdd_data.flatMap(lambda x: format_out(x, cols)).saveAsNewAPIHadoopDataset( + rdd_data.flatMap(lambda x: format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset( conf=hconf, keyConverter=hparams["writeKeyConverter"], valueConverter=hparams["writeValueConverter"]) diff --git a/test/test_data.py b/test/test_data.py index 83dc610..775eb75 100755 --- a/test/test_data.py +++ b/test/test_data.py @@ -59,9 +59,10 @@ def test_ILSVRC(): def test_ILSVRC_S(): timer = Timer() - # dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Train_2') - # dil.format() - # dil.store_img() + dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Train_2') + dil.delete_table() + dil.format() + dil.store_img() dils = ILSVRC_S.DataILSVRC_S(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Train_2') -- libgit2 0.21.2