Commit 2c50777423da8d9d4963cfccf8af1291eb029c04

Authored by Chunk
1 parent 0fbc087e
Exists in master and in 1 other branch refactor

staged.

Showing 1 changed file with 32 additions and 1 deletions   Show diff stats
mdata/ILSVRC_S.py
@@ -410,7 +410,38 @@ class DataILSVRC_S(DataDumperBase): @@ -410,7 +410,38 @@ class DataILSVRC_S(DataDumperBase):
410 raise 410 raise
411 411
412 elif mode == 'spark': 412 elif mode == 'spark':
413 - pass 413 + if self.sparkcontex == None:
  414 + self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageILSVRC',
  415 + master='spark://HPC-server:7077')
  416 +
  417 + cols = ['cf_pic:data',
  418 + 'cf_info:width',
  419 + 'cf_info:height',
  420 + 'cf_info:size',
  421 + 'cf_info:capacity',
  422 + 'cf_info:quality',
  423 + 'cf_info:rate',
  424 + 'cf_tag:chosen',
  425 + 'cf_tag:class']
  426 +
  427 + if readforward:
  428 + self.dict_data = {}
  429 +
  430 + for key, data in self.table.scan(columns=cols):
  431 + data = [data[k] for k in cols]
  432 + self.dict_data[key] = data
  433 + self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=self._rdd_parse_all,
  434 + collect=False)
  435 +
  436 + rdd_data_ext = self.rdd_data.map(lambda x: self._rdd_embed(x))
  437 + self.rdd_data = self.rdd_data.union(rdd_data_ext)
  438 +
  439 + if not writeback:
  440 + return self.dict_data
  441 + else:
  442 + self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols)
  443 +
  444 +
414 else: 445 else:
415 raise Exception("Unknown mode!") 446 raise Exception("Unknown mode!")
416 447