From 2c50777423da8d9d4963cfccf8af1291eb029c04 Mon Sep 17 00:00:00 2001 From: Chunk Date: Fri, 10 Apr 2015 17:36:21 +0800 Subject: [PATCH] staged. --- mdata/ILSVRC_S.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/mdata/ILSVRC_S.py b/mdata/ILSVRC_S.py index c16f3c2..fbcd202 100644 --- a/mdata/ILSVRC_S.py +++ b/mdata/ILSVRC_S.py @@ -410,7 +410,38 @@ class DataILSVRC_S(DataDumperBase): raise elif mode == 'spark': - pass + if self.sparkcontex == None: + self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageILSVRC', + master='spark://HPC-server:7077') + + cols = ['cf_pic:data', + 'cf_info:width', + 'cf_info:height', + 'cf_info:size', + 'cf_info:capacity', + 'cf_info:quality', + 'cf_info:rate', + 'cf_tag:chosen', + 'cf_tag:class'] + + if readforward: + self.dict_data = {} + + for key, data in self.table.scan(columns=cols): + data = [data[k] for k in cols] + self.dict_data[key] = data + self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=self._rdd_parse_all, + collect=False) + + rdd_data_ext = self.rdd_data.map(lambda x: self._rdd_embed(x)) + self.rdd_data = self.rdd_data.union(rdd_data_ext) + + if not writeback: + return self.dict_data + else: + self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols) + + else: raise Exception("Unknown mode!") -- libgit2 0.21.2