diff --git a/mdata/ILSVRC_S.py b/mdata/ILSVRC_S.py index 3c2a148..c16f3c2 100644 --- a/mdata/ILSVRC_S.py +++ b/mdata/ILSVRC_S.py @@ -51,10 +51,12 @@ class DataILSVRC_S(DataDumperBase): self.category = category self.dict_data = {} + self.rdd_data = None self.table_name = self.base_dir.strip('/').split('/')[-1] + '-' + self.category self.sparkcontex = None + self.steger = F5.F5(sample_key, 1) def get_table(self): if self.table != None: @@ -119,6 +121,64 @@ class DataILSVRC_S(DataDumperBase): return desc + def _rdd_parse_data(self, raw_row): + """ + input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True') + return: ([0.056273,...],1) + + In fact we can also use mapValues. + """ + key = raw_row[0] + items = raw_row[1].split('--%--') + data = json.loads(items[0].split(':')[-1]) + return (key, data) + + def _rdd_parse_all(self, raw_row): + key = raw_row[0] + items = raw_row[1].split('--%--') + data = [json.loads(item.split(':')[-1]) for item in items] + return (key, data) + + def _rdd_embed(self, row): + """ + input: + e.g. row =('row1',[1,3400,'hello']) + return: + newrow = ('row2',[34,5400,'embeded']) + """ + items = row[1] + capacity, rate, chosen = items[4], items[6], items[7] + if chosen == 0: + return None + try: + tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') + tmpf_src.write(items[0]) + tmpf_src.seek(0) + tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') + + if rate == None: + embed_rate = self.steger.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'), + tmpf_dst.name) + else: + assert (rate >= 0 and rate < 1) + # print capacity + hidden = np.random.bytes(int(int(capacity) * rate) / 8) + embed_rate = self.steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True) + + tmpf_dst.seek(0) + raw = tmpf_dst.read() + index = md5(raw).hexdigest() + + return (index + '.jpg', [raw] + self._get_info(raw, embed_rate, 0, 1)) + + except Exception as e: + print e + raise + finally: + tmpf_src.close() + tmpf_dst.close() + + def _extract_data(self, mode='hbase', writeback=False): """ Get info barely out of image data. @@ -155,13 +215,34 @@ class DataILSVRC_S(DataDumperBase): elif mode == 'spark': - pass + if self.sparkcontex == None: + self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageILSVRC', + master='spark://HPC-server:7077') + + cols = ['cf_pic:data', + 'cf_info:width', + 'cf_info:height', + 'cf_info:size', + 'cf_info:capacity', + 'cf_info:quality', + 'cf_info:rate', + 'cf_tag:chosen', + 'cf_tag:class'] + + self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=self._rdd_parse_data, + collect=False).mapValues( + lambda data: [data] + self._get_info(data)) + + if not writeback: + return self.rdd_data + else: + self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols) + else: raise Exception("Unknown mode!") def _embed_data(self, mode='hbase', rate=None, readforward=False, writeback=False): - f5 = F5.F5(sample_key, 1) if mode == 'hbase': if self.table == None: self.table = self.get_table() @@ -191,13 +272,14 @@ class DataILSVRC_S(DataDumperBase): tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') if rate == None: - embed_rate = f5.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'), - tmpf_dst.name) + embed_rate = self.steger.embed_raw_data(tmpf_src.name, + os.path.join(package_dir, '../res/toembed'), + tmpf_dst.name) else: assert (rate >= 0 and rate < 1) # print capacity hidden = np.random.bytes(int(int(imgdata[4]) * rate) / 8) - embed_rate = f5.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True) + embed_rate = self.steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True) tmpf_dst.seek(0) raw = tmpf_dst.read() @@ -235,7 +317,37 @@ class DataILSVRC_S(DataDumperBase): raise elif mode == 'spark': - pass + if self.sparkcontex == None: + self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageILSVRC', + master='spark://HPC-server:7077') + + cols = ['cf_pic:data', + 'cf_info:width', + 'cf_info:height', + 'cf_info:size', + 'cf_info:capacity', + 'cf_info:quality', + 'cf_info:rate', + 'cf_tag:chosen', + 'cf_tag:class'] + + if readforward: + self.dict_data = {} + + for key, data in self.table.scan(columns=cols): + data = [data[k] for k in cols] + self.dict_data[key] = data + self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=self._rdd_parse_all, + collect=False) + + rdd_data_ext = self.rdd_data.map(lambda x: self._rdd_embed(x)) + self.rdd_data = self.rdd_data.union(rdd_data_ext) + + if not writeback: + return self.dict_data + else: + self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols) + else: raise Exception("Unknown mode!") diff --git a/mspark/SC.py b/mspark/SC.py index 74cdc23..430c0e3 100644 --- a/mspark/SC.py +++ b/mspark/SC.py @@ -24,6 +24,21 @@ def parse_cv(raw_row): return (feat, tag) +def format_out(row, cols): + """ + input: + e.g. row =('row1',[1,3400,'hello']) + cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']] + return: + [('row1',['row1', 'cf_info', 'id', 1]),('row1',['row1', 'cf_info', 'size', 3400]),('row1',['row1', 'cf_tag', 'desc', 'hello'])] + """ + puts = [] + key = row[0] + for data, col in zip(row[1], cols): + puts.append((key, [key] + col + [data])) + return puts + + class Sparker(object): def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs): load_env() @@ -76,10 +91,17 @@ class Sparker(object): else: return hbase_rdd - def write_hbase(self, table_name, data): + def write_hbase(self, table_name, data, fromrdd=False, columns=None): """ - Data Format: + Data Format: (Deprecated) e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]] + + Data(from dictionary): + e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']}, + cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc'] + Data(from Rdd): + e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])], + cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc'] """ hconf = {"hbase.zookeeper.quorum": self.host, "hbase.mapreduce.inputtable": table_name, @@ -88,8 +110,11 @@ class Sparker(object): "mapreduce.job.output.key.class": hparams["writeKeyClass"], "mapreduce.job.output.value.class": hparams["writeValueClass"], } + cols = [col.split(':') for col in columns] + if not fromrdd: + rdd_data = self.sc.parallelize(data) - self.sc.parallelize(data).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( + rdd_data.flatMap(lambda x: format_out(x, cols)).saveAsNewAPIHadoopDataset( conf=hconf, keyConverter=hparams["writeKeyConverter"], valueConverter=hparams["writeValueConverter"]) diff --git a/scripts/run_spark.sh b/scripts/run_spark.sh index 0ba2638..3918cf9 100755 --- a/scripts/run_spark.sh +++ b/scripts/run_spark.sh @@ -4,6 +4,7 @@ ## ## F**k World! ## +## e.g. ## ## export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:`hbase classpath` ## export SPARK_CLASSPATH=$SPARK_CLASSPATH:`hbase classpath` @@ -27,36 +28,72 @@ ## --class "FuckWorld" \ ## --args $ARGS ## -##spark-submit \ -## --driver-memory 1g \ -## --executor-memory 1g \ -## --executor-cores 2 \ -## --master spark://HPC-server:7077 \ -## --jars $SPARK_HOME/lib/spark-examples-1.2.0-hadoop2.5.1.jar \ -## $APP_JAR $ARGS +## +## spark-submit \ +## --driver-memory 1g \ +## --executor-memory 1g \ +## --executor-cores 2 \ +## --master spark://HPC-server:7077 \ +## --jars $SPARK_HOME/lib/spark-examples-1.2.0-hadoop2.5.1.jar \ +## $APP_JAR $ARGS +## +## +## spark-submit \ +## --driver-memory 1g \ +## --executor-memory 2g \ +## --executor-cores 2 \ +## --master spark://HPC-server:7077 \ +## --jars hdfs://HPC-server:9000/user/spark/share/lib/spark-examples-1.2.0-hadoop2.5.1.jar \ +## --py-files $COMPRESSED \ +## $APP $ARGS ######################################################################################## source /home/hadoop/.zshrc v env1 +#################################################################### +## environment variables +#################################################################### export PYSPARK_PYTHON=/home/hadoop/.virtualenvs/env1/bin/python + export SPARK_CLASSPATH=`hbase classpath` + export SPARK_JAR=hdfs://HPC-server:9000/user/spark/share/lib/spark-assembly-1.2.0-hadoop2.5.1.jar -#COMPRESSED=/home/hadoop/workspace/pycharm/test/ImageR.zip -# --py-files $COMPRESSED \ -COMPRESSED=/home/hadoop/workspace/pycharm/tmp/ImageR/mdata.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/mfeat.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/mjpeg.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/msteg.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/mmodel.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/mspark.zip + + +#################################################################### +## additional files list +#################################################################### +JARS=$SPARK_HOME/lib/spark-examples-1.2.0-hadoop2.5.1.jar + +BASE_DIR=/home/hadoop/workspace/pycharm/test + +PACKAGE=ImageR + +[ -f $BASE_DIR/$PACKAGE.zip ] && rm $BASE_DIR/$PACKAGE.zip + +zip -r $PACKAGE.zip $PACKAGE/ -x $PACKAGE/.git\* $PACKAGE/.idea\* + +COMPRESSED=$BASE_DIR/$PACKAGE.zip APP=test_spark.py #APP=test_model.py -ARGS= +ARGS="" + + + + +#################################################################### +## submit script +#################################################################### spark-submit \ --driver-memory 1g \ --executor-memory 2g \ --executor-cores 2 \ --master spark://HPC-server:7077 \ - --jars $SPARK_HOME/lib/spark-examples-1.2.0-hadoop2.5.1.jar \ + --jars $JARS \ --py-files $COMPRESSED \ $APP $ARGS diff --git a/test/test_data.py b/test/test_data.py index feb7d1d..a29938b 100755 --- a/test/test_data.py +++ b/test/test_data.py @@ -59,9 +59,9 @@ def test_ILSVRC(): def test_ILSVRC_S(): timer = Timer() - dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Train_2') - dil.format() - dil.store_img() + # dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Train_2') + # dil.format() + # dil.store_img() dils = ILSVRC_S.DataILSVRC_S(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Train_2') @@ -70,11 +70,15 @@ def test_ILSVRC_S(): # dils._embed_data(mode='hbase', rate=0.1, readforward=True, writeback=True) # dils._extract_feat( mode='hbase', feattype='ibd', readforward=True, writeback=True) - dils.format() + # dils.format() + + dils._extract_data(mode='spark', writeback=False) if __name__ == '__main__': # test_MSR() # test_CV() - test_ILSVRC() + # test_ILSVRC() + test_ILSVRC_S() + print 'helllo' -- libgit2 0.21.2