Commit 712861f0014b007d5e6bf52ced1d57d2ff644ecc

Authored by Chunk
1 parent f005aa4a
Exists in refactor

extract rdd from SC.

mdata/ILSVRC.py
1 1 __author__ = 'chunk'
2 2  
3 3 from . import *
4   -from ..mfeat import HOG, IntraBlockDiff
5   -from ..mspark import SC
  4 +from ..mfeat import IntraBlockDiff
  5 +from ..mspark import rdd, SC
6 6 from ..common import *
7 7  
8 8 import os, sys
... ... @@ -83,11 +83,11 @@ class DataILSVRC(DataDumperBase):
83 83 pass
84 84  
85 85 def get_feat(self, image, feattype='ibd', **kwargs):
86   - size = kwargs.get('size', (48, 48))
87   -
88   - if feattype == 'hog':
89   - feater = HOG.FeatHOG(size=size)
90   - elif feattype == 'ibd':
  86 + # size = kwargs.get('size', (48, 48))
  87 + #
  88 + # if feattype == 'hog':
  89 + # feater = HOG.FeatHOG(size=size)
  90 + if feattype == 'ibd':
91 91 feater = IntraBlockDiff.FeatIntraBlockDiff()
92 92 else:
93 93 raise Exception("Unknown feature type!")
... ... @@ -99,9 +99,9 @@ class DataILSVRC(DataDumperBase):
99 99  
100 100 def extract_feat(self, feattype='ibd'):
101 101 print "extracting feat..."
102   - if feattype == 'hog':
103   - feater = HOG.FeatHOG(size=(48, 48))
104   - elif feattype == 'ibd':
  102 + # if feattype == 'hog':
  103 + # feater = HOG.FeatHOG(size=(48, 48))
  104 + if feattype == 'ibd':
105 105 feater = IntraBlockDiff.FeatIntraBlockDiff()
106 106 else:
107 107 raise Exception("Unknown feature type!")
... ... @@ -307,7 +307,7 @@ class DataILSVRC(DataDumperBase):
307 307 # cv2.imwrite(os.path.join(base_dir, category + '_crop_cv', name), img_crop)
308 308 # except Exception as e:
309 309 # print '[EXCPT]', e
310   - # pass
  310 + # pass
311 311  
312 312  
313 313 def get_table(self):
... ... @@ -322,10 +322,10 @@ class DataILSVRC(DataDumperBase):
322 322 tables = self.connection.tables()
323 323 if self.table_name not in tables:
324 324 families_compressed = {'cf_pic': dict(compression='LZO'),
325   - 'cf_info': dict(max_versions=10,compression='LZO'),
326   - 'cf_tag': dict(compression='LZO'),
327   - 'cf_feat': dict(compression='LZO'),
328   - }
  325 + 'cf_info': dict(max_versions=10, compression='LZO'),
  326 + 'cf_tag': dict(compression='LZO'),
  327 + 'cf_feat': dict(compression='LZO'),
  328 + }
329 329 families = {'cf_pic': dict(),
330 330 'cf_info': dict(max_versions=10),
331 331 'cf_tag': dict(),
... ...
mdata/ILSVRC_S.py
1 1 __author__ = 'chunk'
2 2  
3 3 from . import *
4   -from ..mfeat import HOG, IntraBlockDiff
5   -from ..mspark import SC
  4 +from ..mfeat import IntraBlockDiff
  5 +from ..mspark import rdd, SC
6 6 from pyspark.mllib.regression import LabeledPoint
7 7 from ..common import *
8 8  
... ... @@ -135,11 +135,11 @@ class DataILSVRC_S(DataDumperBase):
135 135 tmpf.close()
136 136  
137 137 def _get_feat(self, image, feattype='ibd', **kwargs):
138   - size = kwargs.get('size', (48, 48))
139   -
140   - if feattype == 'hog':
141   - feater = HOG.FeatHOG(size=size)
142   - elif feattype == 'ibd':
  138 + # size = kwargs.get('size', (48, 48))
  139 + #
  140 + # if feattype == 'hog':
  141 + # feater = HOG.FeatHOG(size=size)
  142 + if feattype == 'ibd':
143 143 feater = IntraBlockDiff.FeatIntraBlockDiff()
144 144 else:
145 145 raise Exception("Unknown feature type!")
... ... @@ -267,16 +267,16 @@ class DataILSVRC_S(DataDumperBase):
267 267 ]
268 268  
269 269 # # Debug
270   - # tmp_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_data_ILS,
  270 + # tmp_data = self.sparker.read_hbase(self.table_name, func=rdd.rddparse_data_ILS,
271 271 # collect=False)
272   - # # tmp_data = tmp_data.mapValues(lambda data: [data] + SC.rddinfo_ILS(data))
  272 + # # tmp_data = tmp_data.mapValues(lambda data: [data] + rdd.rddinfo_ILS(data))
273 273 # print tmp_data.collect()[0][1]
274 274 # return
275 275  
276 276  
277   - self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_data_ILS,
  277 + self.rdd_data = self.sparker.read_hbase(self.table_name, func=rdd.rddparse_data_ILS,
278 278 collect=False).mapValues(
279   - lambda data: [data] + SC.rddinfo_ILS(data))
  279 + lambda data: [data] + rdd.rddinfo_ILS(data))
280 280  
281 281 if not writeback:
282 282 return self.rdd_data
... ... @@ -293,14 +293,14 @@ class DataILSVRC_S(DataDumperBase):
293 293 ]
294 294  
295 295 # # Debug
296   - # tmp_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_data_ILS,
  296 + # tmp_data = self.sparker.read_hbase(self.table_name, func=rdd.rddparse_data_ILS,
297 297 # collect=False)
298   - # # tmp_data = tmp_data.mapValues(lambda data: [data] + SC.rddinfo_ILS(data))
  298 + # # tmp_data = tmp_data.mapValues(lambda data: [data] + rdd.rddinfo_ILS(data))
299 299 # print tmp_data.collect()[0][1]
300 300 # return
301 301  
302 302  
303   - self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_data_ILS,
  303 + self.rdd_data = self.sparker.read_hbase(self.table_name, func=rdd.rddparse_data_ILS,
304 304 collect=False).mapValues(
305 305 lambda data: [data])
306 306  
... ... @@ -417,12 +417,12 @@ class DataILSVRC_S(DataDumperBase):
417 417 ]
418 418  
419 419 if readforward:
420   - self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False)
  420 + self.rdd_data = self.sparker.read_hbase(self.table_name, func=rdd.rddparse_all_ILS, collect=False)
421 421  
422   - # rdd_data_ext = self.rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=rate)).filter(lambda x: x != None)
  422 + # rdd_data_ext = self.rdd_data.map(lambda x: rdd.rddembed_ILS(x, rate=rate)).filter(lambda x: x != None)
423 423 # self.rdd_data = self.rdd_data.union(rdd_data_ext)
424 424  
425   - self.rdd_data = self.rdd_data.flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=rate))
  425 + self.rdd_data = self.rdd_data.flatMap(lambda x: rdd.rddembed_ILS_EXT(x, rate=rate))
426 426 if not writeback:
427 427 return self.rdd_data
428 428 else:
... ... @@ -513,9 +513,9 @@ class DataILSVRC_S(DataDumperBase):
513 513 ]
514 514  
515 515 if readforward:
516   - self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False)
  516 + self.rdd_data = self.sparker.read_hbase(self.table_name, func=rdd.rddparse_all_ILS, collect=False)
517 517  
518   - self.rdd_data = self.rdd_data.mapValues(lambda items: SC.rddfeat_ILS(items, feattype))
  518 + self.rdd_data = self.rdd_data.mapValues(lambda items: rdd.rddfeat_ILS(items, feattype))
519 519  
520 520 # print self.rdd_data.collect()[0]
521 521 # return
... ... @@ -541,9 +541,9 @@ class DataILSVRC_S(DataDumperBase):
541 541 ]
542 542  
543 543 if readforward:
544   - self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False)
  544 + self.rdd_data = self.sparker.read_hbase(self.table_name, func=rdd.rddparse_all_ILS, collect=False)
545 545  
546   - self.rdd_data = self.rdd_data.mapValues(lambda items: SC.rddanalysis_ILS(items))
  546 + self.rdd_data = self.rdd_data.mapValues(lambda items: rdd.rddanalysis_ILS(items))
547 547  
548 548 # print self.rdd_data.collect()[0]
549 549 # return
... ... @@ -621,7 +621,7 @@ class DataILSVRC_S(DataDumperBase):
621 621 self.sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S',
622 622 master='spark://HPC-server:7077')
623 623  
624   - rdd_dataset = self.sparker.read_hbase(self.table_name, func=SC.rddparse_dataset_ILS, collect=False)
  624 + rdd_dataset = self.sparker.read_hbase(self.table_name, func=rdd.rddparse_dataset_ILS, collect=False)
625 625 if not collect:
626 626 rdd_dataset = rdd_dataset.map(lambda x: LabeledPoint(x[0], x[1]))
627 627 return rdd_dataset
... ...
mmodel/svm/SVM.py
... ... @@ -9,7 +9,7 @@ import os, sys
9 9 from ...mfeat import *
10 10 from ...mmodel import *
11 11 from ...mmodel.svm.svmutil import *
12   -from ...mspark import SC2
  12 +from ...mspark import SC
13 13 from ...common import *
14 14  
15 15 import numpy as np
... ... @@ -191,7 +191,7 @@ class ModelSVM(ModelBase):
191 191  
192 192 def _train_spark(self, X, Y=None):
193 193 if self.sparker == None:
194   - self.sparker = SC2.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077')
  194 + self.sparker = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077')
195 195  
196 196 self.model = self.sparker.train_svm(X, Y)
197 197  
... ...
mspark/SC.py
... ... @@ -4,349 +4,20 @@ __author__ = 'chunk'
4 4 from ..common import *
5 5 from .dependencies import *
6 6 from . import *
7   -# from ..mdata import MSR, CV, ILSVRC, ILSVRC_S
8   -
9   -from ..mjpeg import *
10   -from ..msteg import *
11   -from ..msteg.steganography import LSB, F3, F4, F5
12   -from ..mfeat import IntraBlockDiff
13   -from ..mmodel.svm import SVM2
  7 +from .rdd import *
14 8  
15 9 import sys
16 10 from pyspark import RDD
17 11 from pyspark import SparkConf, SparkContext
18 12 from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
19 13 from pyspark.mllib.regression import LabeledPoint
20   -from numpy import array
21   -import json
22   -import pickle
23   -import tempfile
  14 +
24 15  
25 16 import numpy as np
26   -from scipy import stats
27   -from hashlib import md5
  17 +
28 18  
29 19 np.random.seed(sum(map(ord, "whoami")))
30 20 package_dir = os.path.dirname(os.path.abspath(__file__))
31   -classifier = SVM2.ModelSVM(toolset='sklearn')
32   -
33   -
34   -def rddparse_data_CV(raw_row):
35   - """
36   - input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
37   - return: ([0.056273,...],1)
38   - """
39   - data = raw_row[1].split('--%--')
40   - feat = json.loads(data[0].split(':')[-1])
41   - tag = 1 if data[-1].split(':')[-1] == 'True' else 0
42   - return (feat, tag)
43   -
44   -
45   -def rddparse_data_ILS(raw_row):
46   - """
47   - input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
48   - return: ([0.056273,...],1)
49   -
50   - In fact we can also use mapValues.
51   - """
52   - key = raw_row[0]
53   - # if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
54   - # with open('/tmp/hhhh','wb') as f:
55   - # f.write(raw_row[1].decode('unicode-escape')).encode('latin-1')
56   - items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
57   - data = items[0].split('cf_pic:data:')[-1]
58   - return (key, data)
59   -
60   -
61   -def rddparse_all_ILS(raw_row):
62   - """
63   - Deprecated
64   - """
65   - key = raw_row[0]
66   - items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
67   -
68   - # @TODO
69   - # N.B "ValueError: No JSON object could be decoded" Because the spark-hbase IO is based on strings.
70   - # And the order of items is not as expected. See ../res/row-sample.txt or check in hbase shell for that.
71   -
72   - data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in
73   - items[1:]]
74   -
75   - return (key, data)
76   -
77   -
78   -def rddparse_dataset_ILS(raw_row):
79   - if raw_row[0] == '04650c488a2b163ca8a1f52da6022f03.jpg':
80   - print raw_row
81   - items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
82   - # tag = int(items[-2].split('cf_tag:' + tagtype)[-1])
83   - # feat = [item for sublist in json.loads(items[-1].split('cf_feat:' + feattype)[-1]) for subsublist in sublist for item in subsublist]
84   - tag = int(items[-1].split(':')[-1])
85   - feat = [item for sublist in json.loads(items[0].split(':')[-1]) for subsublist in sublist for
86   - item in subsublist]
87   -
88   - return (tag, feat)
89   -
90   -
91   -def rddinfo_ILS(img, info_rate=None, tag_chosen=None, tag_class=None):
92   - """
93   - Tempfile is our friend. (?)
94   - """
95   - info_rate = info_rate if info_rate != None else 0.0
96   - tag_chosen = tag_chosen if tag_chosen != None else stats.bernoulli.rvs(0.8)
97   - tag_class = tag_class if tag_class != None else 0
98   - try:
99   - tmpf = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b', delete=True)
100   - tmpf.write(img)
101   - tmpf.seek(0)
102   - im = Jpeg(tmpf.name, key=sample_key)
103   - info = [
104   - im.image_width,
105   - im.image_height,
106   - im.image_width * im.image_height,
107   - im.getCapacity(),
108   - im.getQuality(),
109   - info_rate,
110   - tag_chosen,
111   - tag_class
112   - ]
113   - return info
114   - except Exception as e:
115   - print e
116   - raise
117   - finally:
118   - tmpf.close()
119   -
120   -
121   -def rddembed_ILS(row, rate=None):
122   - """
123   - input:
124   - e.g. row =('row1',[1,3400,'hello'])
125   - return:
126   - newrow = ('row2',[34,5400,'embeded'])
127   - """
128   - items = row[1]
129   - capacity, chosen = int(items[4]), int(items[7])
130   - if chosen == 0:
131   - return None
132   - try:
133   - tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
134   - tmpf_src.write(items[0])
135   - tmpf_src.seek(0)
136   - tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
137   -
138   - steger = F5.F5(sample_key, 1)
139   -
140   - if rate == None:
141   - embed_rate = steger.embed_raw_data(tmpf_src.name,
142   - os.path.join(package_dir, '../res/toembed'),
143   - tmpf_dst.name)
144   - else:
145   - assert (rate >= 0 and rate < 1)
146   - # print capacity
147   - hidden = np.random.bytes(int(int(capacity) * rate) / 8)
148   - embed_rate = steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
149   -
150   - tmpf_dst.seek(0)
151   - raw = tmpf_dst.read()
152   - index = md5(raw).hexdigest()
153   -
154   - return (index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))
155   -
156   - except Exception as e:
157   - print e
158   - raise
159   - finally:
160   - tmpf_src.close()
161   - tmpf_dst.close()
162   -
163   -
164   -def rddembed_ILS_EXT(row, rate=None):
165   - """
166   - input:
167   - e.g. row =('row1',[1,3400,'hello'])
168   - return:
169   - newrow = ('row2',[34,5400,'embeded']) or NULL
170   - [row,newrow]
171   - """
172   - items = row[1]
173   - capacity, chosen = int(items[4]), int(items[7])
174   - if chosen == 0:
175   - return [row]
176   - try:
177   - tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
178   - tmpf_src.write(items[0])
179   - tmpf_src.seek(0)
180   - tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
181   -
182   - steger = F5.F5(sample_key, 2)
183   -
184   - if rate == None:
185   - embed_rate = steger.embed_raw_data(tmpf_src.name,
186   - os.path.join(package_dir, '../res/toembed'),
187   - tmpf_dst.name)
188   - else:
189   - assert (rate >= 0 and rate < 1)
190   - # print capacity
191   - hidden = np.random.bytes(int(int(capacity) * rate) / 8)
192   - embed_rate = steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
193   -
194   - tmpf_dst.seek(0)
195   - raw = tmpf_dst.read()
196   - index = md5(raw).hexdigest()
197   -
198   - return [row, (index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))]
199   -
200   - except Exception as e:
201   - print e
202   - raise
203   - finally:
204   - tmpf_src.close()
205   - tmpf_dst.close()
206   -
207   -
208   -def _get_feat(image, feattype='ibd', **kwargs):
209   - if feattype == 'ibd':
210   - feater = IntraBlockDiff.FeatIntraBlockDiff()
211   - else:
212   - raise Exception("Unknown feature type!")
213   -
214   - desc = feater.feat(image)
215   -
216   - return desc
217   -
218   -
219   -def rddfeat_ILS(items, feattype='ibd', **kwargs):
220   - try:
221   - tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
222   - tmpf_src.write(items[0])
223   - tmpf_src.seek(0)
224   -
225   - desc = json.dumps(_get_feat(tmpf_src.name, feattype=feattype).tolist())
226   - # print 'desccccccccccccccccccc',desc
227   - return items + [desc]
228   -
229   - except Exception as e:
230   - print e
231   - raise
232   - finally:
233   - tmpf_src.close()
234   -
235   -
236   -def rddanalysis_ILS(items, feattype='ibd', **kwargs):
237   - head = np.fromstring(items[0][:2], dtype=np.uint8)
238   - if not np.array_equal(head, [255, 216]):
239   - return items + [0]
240   - try:
241   - tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
242   - tmpf_src.write(items[0])
243   - tmpf_src.seek(0)
244   -
245   - desc = _get_feat(tmpf_src.name, feattype=feattype)
246   - tag = classifier.predict(desc.ravel())[0]
247   - # print 'desccccccccccccccccccc',desc
248   - return items + [tag]
249   -
250   - except Exception as e:
251   - print e
252   - raise
253   - finally:
254   - tmpf_src.close()
255   -
256   - # return items + classifier.predict(items[-1])
257   -
258   -
259   -def format_out(row, cols, withdata=False):
260   - """
261   - input:
262   - e.g. row =('row1',[1,3400,'hello'])
263   - cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']]
264   - return:
265   - [('row1',['row1', 'cf_info', 'id', '1']),('row1',['row1', 'cf_info', 'size', '3400']),('row1',['row1', 'cf_tag', 'desc', 'hello'])]
266   - """
267   - puts = []
268   - key = row[0]
269   - # if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
270   - # print row
271   - if not withdata:
272   - for data, col in zip(row[1][1:], cols[1:]):
273   - puts.append((key, [key] + col + [str(data)]))
274   - else:
275   - for data, col in zip(row[1], cols):
276   - puts.append((key, [key] + col + [str(data)]))
277   - return puts
278   -
279   -
280   -# scconf = SparkConf()
281   -# scconf.setSparkHome("HPC-server") \
282   -# .setMaster("spark://HPC-server:7077") \
283   -# .setAppName("example")
284   -# sc = SparkContext(conf=scconf)
285   -#
286   -#
287   -# def read_hbase(table_name, func=None, collect=False):
288   -# """
289   -# ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
290   -#
291   -# Filter format:
292   -# columns=['cf1:col1', 'cf1:col2']
293   -# or
294   -# columns=['cf1']
295   -#
296   -# """
297   -#
298   -# hconf = {
299   -# "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
300   -# # "hbase.zookeeper.quorum": self.host,
301   -# "hbase.mapreduce.inputtable": table_name,
302   -# }
303   -#
304   -# hbase_rdd = sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
305   -# keyClass=hparams["readKeyClass"],
306   -# valueClass=hparams["readValueClass"],
307   -# keyConverter=hparams["readKeyConverter"],
308   -# valueConverter=hparams["readValueConverter"],
309   -# conf=hconf)
310   -#
311   -# parser = func if func != None else rddparse_data_CV
312   -# hbase_rdd = hbase_rdd.map(lambda x: parser(x))
313   -#
314   -# if collect:
315   -# return hbase_rdd.collect()
316   -# else:
317   -# return hbase_rdd
318   -#
319   -#
320   -# def write_hbase(table_name, data, fromrdd=False, columns=None, withdata=False):
321   -# """
322   -# Data Format: (Deprecated)
323   -# e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
324   -#
325   -# Data(from dictionary):
326   -# e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
327   -# cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
328   -# Data(from Rdd):
329   -# e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
330   -# cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
331   -# """
332   -# hconf = {
333   -# "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", # "hbase.zookeeper.quorum": self.host,
334   -# "hbase.mapreduce.inputtable": table_name,
335   -# "hbase.mapred.outputtable": table_name,
336   -# "mapreduce.outputformat.class": hparams["outputFormatClass"],
337   -# "mapreduce.job.output.key.class": hparams["writeKeyClass"],
338   -# "mapreduce.job.output.value.class": hparams["writeValueClass"],
339   -# }
340   -# cols = [col.split(':') for col in columns]
341   -# if not fromrdd:
342   -# rdd_data = sc.parallelize(data)
343   -# else:
344   -# rdd_data = data
345   -#
346   -# rdd_data.flatMap(lambda x: format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset(
347   -# conf=hconf,
348   -# keyConverter=hparams["writeKeyConverter"],
349   -# valueConverter=hparams["writeValueConverter"])
350 21  
351 22  
352 23 class Sparker(object):
... ...
mspark/rdd.py 0 โ†’ 100644
... ... @@ -0,0 +1,267 @@
  1 +__author__ = 'hadoop'
  2 +
  3 +from ..common import *
  4 +
  5 +from ..mjpeg import *
  6 +from ..msteg import *
  7 +from ..msteg.steganography import LSB, F3, F4, F5
  8 +from ..mfeat import IntraBlockDiff
  9 +from ..mmodel.svm import SVM
  10 +
  11 +from numpy import array
  12 +import json
  13 +import pickle
  14 +import tempfile
  15 +
  16 +import numpy as np
  17 +from scipy import stats
  18 +from hashlib import md5
  19 +
  20 +np.random.seed(sum(map(ord, "whoami")))
  21 +package_dir = os.path.dirname(os.path.abspath(__file__))
  22 +classifier = SVM.ModelSVM(toolset='sklearn')
  23 +
  24 +def rddparse_data_CV(raw_row):
  25 + """
  26 + input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
  27 + return: ([0.056273,...],1)
  28 + """
  29 + data = raw_row[1].split('--%--')
  30 + feat = json.loads(data[0].split(':')[-1])
  31 + tag = 1 if data[-1].split(':')[-1] == 'True' else 0
  32 + return (feat, tag)
  33 +
  34 +
  35 +def rddparse_data_ILS(raw_row):
  36 + """
  37 + input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
  38 + return: ([0.056273,...],1)
  39 +
  40 + In fact we can also use mapValues.
  41 + """
  42 + key = raw_row[0]
  43 + # if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
  44 + # with open('/tmp/hhhh','wb') as f:
  45 + # f.write(raw_row[1].decode('unicode-escape')).encode('latin-1')
  46 + items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
  47 + data = items[0].split('cf_pic:data:')[-1]
  48 + return (key, data)
  49 +
  50 +
  51 +def rddparse_all_ILS(raw_row):
  52 + """
  53 + Deprecated
  54 + """
  55 + key = raw_row[0]
  56 + items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
  57 +
  58 + # @TODO
  59 + # N.B "ValueError: No JSON object could be decoded" Because the spark-hbase IO is based on strings.
  60 + # And the order of items is not as expected. See ../res/row-sample.txt or check in hbase shell for that.
  61 +
  62 + data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in
  63 + items[1:]]
  64 +
  65 + return (key, data)
  66 +
  67 +
  68 +def rddparse_dataset_ILS(raw_row):
  69 + if raw_row[0] == '04650c488a2b163ca8a1f52da6022f03.jpg':
  70 + print raw_row
  71 + items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
  72 + # tag = int(items[-2].split('cf_tag:' + tagtype)[-1])
  73 + # feat = [item for sublist in json.loads(items[-1].split('cf_feat:' + feattype)[-1]) for subsublist in sublist for item in subsublist]
  74 + tag = int(items[-1].split(':')[-1])
  75 + feat = [item for sublist in json.loads(items[0].split(':')[-1]) for subsublist in sublist for
  76 + item in subsublist]
  77 +
  78 + return (tag, feat)
  79 +
  80 +
  81 +def rddinfo_ILS(img, info_rate=None, tag_chosen=None, tag_class=None):
  82 + """
  83 + Tempfile is our friend. (?)
  84 + """
  85 + info_rate = info_rate if info_rate != None else 0.0
  86 + tag_chosen = tag_chosen if tag_chosen != None else stats.bernoulli.rvs(0.8)
  87 + tag_class = tag_class if tag_class != None else 0
  88 + try:
  89 + tmpf = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b', delete=True)
  90 + tmpf.write(img)
  91 + tmpf.seek(0)
  92 + im = Jpeg(tmpf.name, key=sample_key)
  93 + info = [
  94 + im.image_width,
  95 + im.image_height,
  96 + im.image_width * im.image_height,
  97 + im.getCapacity(),
  98 + im.getQuality(),
  99 + info_rate,
  100 + tag_chosen,
  101 + tag_class
  102 + ]
  103 + return info
  104 + except Exception as e:
  105 + print e
  106 + raise
  107 + finally:
  108 + tmpf.close()
  109 +
  110 +
  111 +def rddembed_ILS(row, rate=None):
  112 + """
  113 + input:
  114 + e.g. row =('row1',[1,3400,'hello'])
  115 + return:
  116 + newrow = ('row2',[34,5400,'embeded'])
  117 + """
  118 + items = row[1]
  119 + capacity, chosen = int(items[4]), int(items[7])
  120 + if chosen == 0:
  121 + return None
  122 + try:
  123 + tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
  124 + tmpf_src.write(items[0])
  125 + tmpf_src.seek(0)
  126 + tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
  127 +
  128 + steger = F5.F5(sample_key, 1)
  129 +
  130 + if rate == None:
  131 + embed_rate = steger.embed_raw_data(tmpf_src.name,
  132 + os.path.join(package_dir, '../res/toembed'),
  133 + tmpf_dst.name)
  134 + else:
  135 + assert (rate >= 0 and rate < 1)
  136 + # print capacity
  137 + hidden = np.random.bytes(int(int(capacity) * rate) / 8)
  138 + embed_rate = steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
  139 +
  140 + tmpf_dst.seek(0)
  141 + raw = tmpf_dst.read()
  142 + index = md5(raw).hexdigest()
  143 +
  144 + return (index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))
  145 +
  146 + except Exception as e:
  147 + print e
  148 + raise
  149 + finally:
  150 + tmpf_src.close()
  151 + tmpf_dst.close()
  152 +
  153 +
  154 +def rddembed_ILS_EXT(row, rate=None):
  155 + """
  156 + input:
  157 + e.g. row =('row1',[1,3400,'hello'])
  158 + return:
  159 + newrow = ('row2',[34,5400,'embeded']) or NULL
  160 + [row,newrow]
  161 + """
  162 + items = row[1]
  163 + capacity, chosen = int(items[4]), int(items[7])
  164 + if chosen == 0:
  165 + return [row]
  166 + try:
  167 + tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
  168 + tmpf_src.write(items[0])
  169 + tmpf_src.seek(0)
  170 + tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
  171 +
  172 + steger = F5.F5(sample_key, 2)
  173 +
  174 + if rate == None:
  175 + embed_rate = steger.embed_raw_data(tmpf_src.name,
  176 + os.path.join(package_dir, '../res/toembed'),
  177 + tmpf_dst.name)
  178 + else:
  179 + assert (rate >= 0 and rate < 1)
  180 + # print capacity
  181 + hidden = np.random.bytes(int(int(capacity) * rate) / 8)
  182 + embed_rate = steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
  183 +
  184 + tmpf_dst.seek(0)
  185 + raw = tmpf_dst.read()
  186 + index = md5(raw).hexdigest()
  187 +
  188 + return [row, (index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))]
  189 +
  190 + except Exception as e:
  191 + print e
  192 + raise
  193 + finally:
  194 + tmpf_src.close()
  195 + tmpf_dst.close()
  196 +
  197 +
  198 +def _get_feat(image, feattype='ibd', **kwargs):
  199 + if feattype == 'ibd':
  200 + feater = IntraBlockDiff.FeatIntraBlockDiff()
  201 + else:
  202 + raise Exception("Unknown feature type!")
  203 +
  204 + desc = feater.feat(image)
  205 +
  206 + return desc
  207 +
  208 +
  209 +def rddfeat_ILS(items, feattype='ibd', **kwargs):
  210 + try:
  211 + tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
  212 + tmpf_src.write(items[0])
  213 + tmpf_src.seek(0)
  214 +
  215 + desc = json.dumps(_get_feat(tmpf_src.name, feattype=feattype).tolist())
  216 + # print 'desccccccccccccccccccc',desc
  217 + return items + [desc]
  218 +
  219 + except Exception as e:
  220 + print e
  221 + raise
  222 + finally:
  223 + tmpf_src.close()
  224 +
  225 +
  226 +def rddanalysis_ILS(items, feattype='ibd', **kwargs):
  227 + head = np.fromstring(items[0][:2], dtype=np.uint8)
  228 + if not np.array_equal(head, [255, 216]):
  229 + return items + [0]
  230 + try:
  231 + tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
  232 + tmpf_src.write(items[0])
  233 + tmpf_src.seek(0)
  234 +
  235 + desc = _get_feat(tmpf_src.name, feattype=feattype)
  236 + tag = classifier.predict(desc.ravel())[0]
  237 + # print 'desccccccccccccccccccc',desc
  238 + return items + [tag]
  239 +
  240 + except Exception as e:
  241 + print e
  242 + raise
  243 + finally:
  244 + tmpf_src.close()
  245 +
  246 + # return items + classifier.predict(items[-1])
  247 +
  248 +
  249 +def format_out(row, cols, withdata=False):
  250 + """
  251 + input:
  252 + e.g. row =('row1',[1,3400,'hello'])
  253 + cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']]
  254 + return:
  255 + [('row1',['row1', 'cf_info', 'id', '1']),('row1',['row1', 'cf_info', 'size', '3400']),('row1',['row1', 'cf_tag', 'desc', 'hello'])]
  256 + """
  257 + puts = []
  258 + key = row[0]
  259 + # if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
  260 + # print row
  261 + if not withdata:
  262 + for data, col in zip(row[1][1:], cols[1:]):
  263 + puts.append((key, [key] + col + [str(data)]))
  264 + else:
  265 + for data, col in zip(row[1], cols):
  266 + puts.append((key, [key] + col + [str(data)]))
  267 + return puts
... ...