Commit 0fbc087e1be3b4c2acc8627d1ef1c61d10961e5e

Authored by Chunk
1 parent 35cf2e3a
Exists in master and in 1 other branch refactor

staged.

mdata/ILSVRC_S.py
... ... @@ -51,10 +51,12 @@ class DataILSVRC_S(DataDumperBase):
51 51 self.category = category
52 52  
53 53 self.dict_data = {}
  54 + self.rdd_data = None
54 55  
55 56 self.table_name = self.base_dir.strip('/').split('/')[-1] + '-' + self.category
56 57 self.sparkcontex = None
57 58  
  59 + self.steger = F5.F5(sample_key, 1)
58 60  
59 61 def get_table(self):
60 62 if self.table != None:
... ... @@ -119,6 +121,64 @@ class DataILSVRC_S(DataDumperBase):
119 121  
120 122 return desc
121 123  
  124 + def _rdd_parse_data(self, raw_row):
  125 + """
  126 + input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
  127 + return: ([0.056273,...],1)
  128 +
  129 + In fact we can also use mapValues.
  130 + """
  131 + key = raw_row[0]
  132 + items = raw_row[1].split('--%--')
  133 + data = json.loads(items[0].split(':')[-1])
  134 + return (key, data)
  135 +
  136 + def _rdd_parse_all(self, raw_row):
  137 + key = raw_row[0]
  138 + items = raw_row[1].split('--%--')
  139 + data = [json.loads(item.split(':')[-1]) for item in items]
  140 + return (key, data)
  141 +
  142 + def _rdd_embed(self, row):
  143 + """
  144 + input:
  145 + e.g. row =('row1',[1,3400,'hello'])
  146 + return:
  147 + newrow = ('row2',[34,5400,'embeded'])
  148 + """
  149 + items = row[1]
  150 + capacity, rate, chosen = items[4], items[6], items[7]
  151 + if chosen == 0:
  152 + return None
  153 + try:
  154 + tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
  155 + tmpf_src.write(items[0])
  156 + tmpf_src.seek(0)
  157 + tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
  158 +
  159 + if rate == None:
  160 + embed_rate = self.steger.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'),
  161 + tmpf_dst.name)
  162 + else:
  163 + assert (rate >= 0 and rate < 1)
  164 + # print capacity
  165 + hidden = np.random.bytes(int(int(capacity) * rate) / 8)
  166 + embed_rate = self.steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
  167 +
  168 + tmpf_dst.seek(0)
  169 + raw = tmpf_dst.read()
  170 + index = md5(raw).hexdigest()
  171 +
  172 + return (index + '.jpg', [raw] + self._get_info(raw, embed_rate, 0, 1))
  173 +
  174 + except Exception as e:
  175 + print e
  176 + raise
  177 + finally:
  178 + tmpf_src.close()
  179 + tmpf_dst.close()
  180 +
  181 +
122 182 def _extract_data(self, mode='hbase', writeback=False):
123 183 """
124 184 Get info barely out of image data.
... ... @@ -155,13 +215,34 @@ class DataILSVRC_S(DataDumperBase):
155 215  
156 216  
157 217 elif mode == 'spark':
158   - pass
  218 + if self.sparkcontex == None:
  219 + self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageILSVRC',
  220 + master='spark://HPC-server:7077')
  221 +
  222 + cols = ['cf_pic:data',
  223 + 'cf_info:width',
  224 + 'cf_info:height',
  225 + 'cf_info:size',
  226 + 'cf_info:capacity',
  227 + 'cf_info:quality',
  228 + 'cf_info:rate',
  229 + 'cf_tag:chosen',
  230 + 'cf_tag:class']
  231 +
  232 + self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=self._rdd_parse_data,
  233 + collect=False).mapValues(
  234 + lambda data: [data] + self._get_info(data))
  235 +
  236 + if not writeback:
  237 + return self.rdd_data
  238 + else:
  239 + self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols)
  240 +
159 241 else:
160 242 raise Exception("Unknown mode!")
161 243  
162 244  
163 245 def _embed_data(self, mode='hbase', rate=None, readforward=False, writeback=False):
164   - f5 = F5.F5(sample_key, 1)
165 246 if mode == 'hbase':
166 247 if self.table == None:
167 248 self.table = self.get_table()
... ... @@ -191,13 +272,14 @@ class DataILSVRC_S(DataDumperBase):
191 272 tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
192 273  
193 274 if rate == None:
194   - embed_rate = f5.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'),
195   - tmpf_dst.name)
  275 + embed_rate = self.steger.embed_raw_data(tmpf_src.name,
  276 + os.path.join(package_dir, '../res/toembed'),
  277 + tmpf_dst.name)
196 278 else:
197 279 assert (rate >= 0 and rate < 1)
198 280 # print capacity
199 281 hidden = np.random.bytes(int(int(imgdata[4]) * rate) / 8)
200   - embed_rate = f5.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
  282 + embed_rate = self.steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
201 283  
202 284 tmpf_dst.seek(0)
203 285 raw = tmpf_dst.read()
... ... @@ -235,7 +317,37 @@ class DataILSVRC_S(DataDumperBase):
235 317 raise
236 318  
237 319 elif mode == 'spark':
238   - pass
  320 + if self.sparkcontex == None:
  321 + self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageILSVRC',
  322 + master='spark://HPC-server:7077')
  323 +
  324 + cols = ['cf_pic:data',
  325 + 'cf_info:width',
  326 + 'cf_info:height',
  327 + 'cf_info:size',
  328 + 'cf_info:capacity',
  329 + 'cf_info:quality',
  330 + 'cf_info:rate',
  331 + 'cf_tag:chosen',
  332 + 'cf_tag:class']
  333 +
  334 + if readforward:
  335 + self.dict_data = {}
  336 +
  337 + for key, data in self.table.scan(columns=cols):
  338 + data = [data[k] for k in cols]
  339 + self.dict_data[key] = data
  340 + self.rdd_data = self.sparkcontex.read_hbase(self.table_name, func=self._rdd_parse_all,
  341 + collect=False)
  342 +
  343 + rdd_data_ext = self.rdd_data.map(lambda x: self._rdd_embed(x))
  344 + self.rdd_data = self.rdd_data.union(rdd_data_ext)
  345 +
  346 + if not writeback:
  347 + return self.dict_data
  348 + else:
  349 + self.sparkcontex.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols)
  350 +
239 351 else:
240 352 raise Exception("Unknown mode!")
241 353  
... ...
mspark/SC.py
... ... @@ -24,6 +24,21 @@ def parse_cv(raw_row):
24 24 return (feat, tag)
25 25  
26 26  
  27 +def format_out(row, cols):
  28 + """
  29 + input:
  30 + e.g. row =('row1',[1,3400,'hello'])
  31 + cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']]
  32 + return:
  33 + [('row1',['row1', 'cf_info', 'id', 1]),('row1',['row1', 'cf_info', 'size', 3400]),('row1',['row1', 'cf_tag', 'desc', 'hello'])]
  34 + """
  35 + puts = []
  36 + key = row[0]
  37 + for data, col in zip(row[1], cols):
  38 + puts.append((key, [key] + col + [data]))
  39 + return puts
  40 +
  41 +
27 42 class Sparker(object):
28 43 def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
29 44 load_env()
... ... @@ -76,10 +91,17 @@ class Sparker(object):
76 91 else:
77 92 return hbase_rdd
78 93  
79   - def write_hbase(self, table_name, data):
  94 + def write_hbase(self, table_name, data, fromrdd=False, columns=None):
80 95 """
81   - Data Format:
  96 + Data Format: (Deprecated)
82 97 e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
  98 +
  99 + Data(from dictionary):
  100 + e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
  101 + cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
  102 + Data(from Rdd):
  103 + e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
  104 + cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
83 105 """
84 106 hconf = {"hbase.zookeeper.quorum": self.host,
85 107 "hbase.mapreduce.inputtable": table_name,
... ... @@ -88,8 +110,11 @@ class Sparker(object):
88 110 "mapreduce.job.output.key.class": hparams["writeKeyClass"],
89 111 "mapreduce.job.output.value.class": hparams["writeValueClass"],
90 112 }
  113 + cols = [col.split(':') for col in columns]
  114 + if not fromrdd:
  115 + rdd_data = self.sc.parallelize(data)
91 116  
92   - self.sc.parallelize(data).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
  117 + rdd_data.flatMap(lambda x: format_out(x, cols)).saveAsNewAPIHadoopDataset(
93 118 conf=hconf,
94 119 keyConverter=hparams["writeKeyConverter"],
95 120 valueConverter=hparams["writeValueConverter"])
... ...
scripts/run_spark.sh
... ... @@ -4,6 +4,7 @@
4 4 ##
5 5 ## F**k World!
6 6 ##
  7 +## e.g.
7 8 ##
8 9 ## export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:`hbase classpath`
9 10 ## export SPARK_CLASSPATH=$SPARK_CLASSPATH:`hbase classpath`
... ... @@ -27,36 +28,72 @@
27 28 ## --class "FuckWorld" \
28 29 ## --args $ARGS
29 30 ##
30   -##spark-submit \
31   -## --driver-memory 1g \
32   -## --executor-memory 1g \
33   -## --executor-cores 2 \
34   -## --master spark://HPC-server:7077 \
35   -## --jars $SPARK_HOME/lib/spark-examples-1.2.0-hadoop2.5.1.jar \
36   -## $APP_JAR $ARGS
  31 +##
  32 +## spark-submit \
  33 +## --driver-memory 1g \
  34 +## --executor-memory 1g \
  35 +## --executor-cores 2 \
  36 +## --master spark://HPC-server:7077 \
  37 +## --jars $SPARK_HOME/lib/spark-examples-1.2.0-hadoop2.5.1.jar \
  38 +## $APP_JAR $ARGS
  39 +##
  40 +##
  41 +## spark-submit \
  42 +## --driver-memory 1g \
  43 +## --executor-memory 2g \
  44 +## --executor-cores 2 \
  45 +## --master spark://HPC-server:7077 \
  46 +## --jars hdfs://HPC-server:9000/user/spark/share/lib/spark-examples-1.2.0-hadoop2.5.1.jar \
  47 +## --py-files $COMPRESSED \
  48 +## $APP $ARGS
37 49 ########################################################################################
38 50  
39 51 source /home/hadoop/.zshrc
40 52 v env1
41 53  
  54 +####################################################################
  55 +## environment variables
  56 +####################################################################
42 57 export PYSPARK_PYTHON=/home/hadoop/.virtualenvs/env1/bin/python
  58 +
43 59 export SPARK_CLASSPATH=`hbase classpath`
  60 +
44 61 export SPARK_JAR=hdfs://HPC-server:9000/user/spark/share/lib/spark-assembly-1.2.0-hadoop2.5.1.jar
45 62  
46   -#COMPRESSED=/home/hadoop/workspace/pycharm/test/ImageR.zip
47   -# --py-files $COMPRESSED \
48   -COMPRESSED=/home/hadoop/workspace/pycharm/tmp/ImageR/mdata.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/mfeat.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/mjpeg.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/msteg.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/mmodel.zip,/home/hadoop/workspace/pycharm/tmp/ImageR/mspark.zip
  63 +
  64 +
  65 +####################################################################
  66 +## additional files list
  67 +####################################################################
  68 +JARS=$SPARK_HOME/lib/spark-examples-1.2.0-hadoop2.5.1.jar
  69 +
  70 +BASE_DIR=/home/hadoop/workspace/pycharm/test
  71 +
  72 +PACKAGE=ImageR
  73 +
  74 +[ -f $BASE_DIR/$PACKAGE.zip ] && rm $BASE_DIR/$PACKAGE.zip
  75 +
  76 +zip -r $PACKAGE.zip $PACKAGE/ -x $PACKAGE/.git\* $PACKAGE/.idea\*
  77 +
  78 +COMPRESSED=$BASE_DIR/$PACKAGE.zip
49 79  
50 80 APP=test_spark.py
51 81 #APP=test_model.py
52   -ARGS=
53 82  
  83 +ARGS=""
  84 +
  85 +
  86 +
  87 +
  88 +####################################################################
  89 +## submit script
  90 +####################################################################
54 91 spark-submit \
55 92 --driver-memory 1g \
56 93 --executor-memory 2g \
57 94 --executor-cores 2 \
58 95 --master spark://HPC-server:7077 \
59   - --jars $SPARK_HOME/lib/spark-examples-1.2.0-hadoop2.5.1.jar \
  96 + --jars $JARS \
60 97 --py-files $COMPRESSED \
61 98 $APP $ARGS
62 99  
... ...
test/test_data.py
... ... @@ -59,9 +59,9 @@ def test_ILSVRC():
59 59 def test_ILSVRC_S():
60 60 timer = Timer()
61 61  
62   - dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Train_2')
63   - dil.format()
64   - dil.store_img()
  62 + # dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Train_2')
  63 + # dil.format()
  64 + # dil.store_img()
65 65  
66 66  
67 67 dils = ILSVRC_S.DataILSVRC_S(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Train_2')
... ... @@ -70,11 +70,15 @@ def test_ILSVRC_S():
70 70 # dils._embed_data(mode='hbase', rate=0.1, readforward=True, writeback=True)
71 71 # dils._extract_feat( mode='hbase', feattype='ibd', readforward=True, writeback=True)
72 72  
73   - dils.format()
  73 + # dils.format()
  74 +
  75 + dils._extract_data(mode='spark', writeback=False)
74 76  
75 77 if __name__ == '__main__':
76 78 # test_MSR()
77 79 # test_CV()
78   - test_ILSVRC()
  80 + # test_ILSVRC()
  81 + test_ILSVRC_S()
  82 +
79 83  
80 84 print 'helllo'
... ...