ea1eb31a
Chunk
spark is privileg...
|
1
2
3
|
__author__ = 'chunk'
from . import *
|
84648488
Chunk
reverted.
|
4
|
from ..mfeat import HOG, IntraBlockDiff
|
ea1eb31a
Chunk
spark is privileg...
|
5
|
from ..mspark import SC
|
02528074
Chunk
staged.
|
6
|
from ..common import *
|
ea1eb31a
Chunk
spark is privileg...
|
7
8
9
|
import os, sys
from PIL import Image
|
ea1eb31a
Chunk
spark is privileg...
|
10
11
|
from hashlib import md5
import csv
|
ea1eb31a
Chunk
spark is privileg...
|
12
|
import shutil
|
ea1eb31a
Chunk
spark is privileg...
|
13
14
15
16
17
18
19
|
import json
import collections
import happybase
from ..mjpeg import *
from ..msteg import *
from ..msteg.steganography import LSB, F3, F4, F5
|
ea1eb31a
Chunk
spark is privileg...
|
20
21
|
import numpy as np
|
f25fd27c
Chunk
staged. 'hbase' m...
|
22
|
from numpy.random import randn
|
ea1eb31a
Chunk
spark is privileg...
|
23
24
25
26
27
28
|
import pandas as pd
from scipy import stats
from subprocess import Popen, PIPE, STDOUT
import tempfile
|
24768a99
Chunk
mode 'hbase' fini...
|
29
|
np.random.seed(sum(map(ord, "whoami")))
|
f25fd27c
Chunk
staged. 'hbase' m...
|
30
31
32
33
34
35
36
37
38
|
package_dir = os.path.dirname(os.path.abspath(__file__))
class DataILSVRC_S(DataDumperBase):
"""
This module is specially for ILSVRC data processing under spark & hbase.
We posit that the DB(e.g. HBase) has only the images data with md5 name as id.
|
35cf2e3a
Chunk
staged.
|
39
|
The task is to gennerate info(size,capacity,quality,etc.) and class & chosen tags, and then to perform embedding and finally to calcculate ibd features.
|
f25fd27c
Chunk
staged. 'hbase' m...
|
40
41
|
Each step includes reading from & writing to Hbase (though PC).
|
4f36b116
Chunk
staged.
|
42
|
And each step must have a 'spark' mode option, which means that the operation is performed by spark with reading & wrting through RDDs.
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
43
|
|
ea1eb31a
Chunk
spark is privileg...
|
44
|
copyright(c) 2015 chunkplus@gmail.com
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
45
|
"""
|
ea1eb31a
Chunk
spark is privileg...
|
46
|
|
ea1eb31a
Chunk
spark is privileg...
|
47
48
|
def __init__(self, base_dir='/media/chunk/Elements/D/data/ImageNet/img/ILSVRC2013_DET_val', category='Train'):
DataDumperBase.__init__(self, base_dir, category)
|
0fbc087e
Chunk
staged.
|
49
|
|
ea1eb31a
Chunk
spark is privileg...
|
50
|
self.base_dir = base_dir
|
4f36b116
Chunk
staged.
|
51
52
53
54
55
56
|
self.category = category
self.dict_data = {}
self.table_name = self.base_dir.strip('/').split('/')[-1] + '-' + self.category
self.sparkcontex = None
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
57
|
|
f4fb4381
Chunk
staged.
|
58
59
|
def get_table(self):
|
ea1eb31a
Chunk
spark is privileg...
|
60
|
if self.table != None:
|
0fbc087e
Chunk
staged.
|
61
|
return self.table
|
ea1eb31a
Chunk
spark is privileg...
|
62
|
|
24768a99
Chunk
mode 'hbase' fini...
|
63
|
if self.connection is None:
|
4f36b116
Chunk
staged.
|
64
65
|
c = happybase.Connection('HPC-server')
self.connection = c
|
ea1eb31a
Chunk
spark is privileg...
|
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
tables = self.connection.tables()
if self.table_name not in tables:
families = {'cf_pic': dict(),
'cf_info': dict(max_versions=10),
'cf_tag': dict(),
'cf_feat': dict(),
}
self.connection.create_table(name=self.table_name, families=families)
table = self.connection.table(name=self.table_name)
self.table = table
return table
def _get_info(self, img, info_rate=None, tag_chosen=None, tag_class=None):
"""
Tempfile is our friend. (?)
"""
info_rate = info_rate if info_rate != None else 0.0
tag_chosen = tag_chosen if tag_chosen != None else stats.bernoulli.rvs(0.8)
|
d47ae6ce
Chunk
staged.
|
88
|
tag_class = tag_class if tag_class != None else 0
|
f1fa5b17
Chunk
review & streaming.
|
89
|
try:
|
d47ae6ce
Chunk
staged.
|
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
tmpf = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf.write(img)
tmpf.seek(0)
im = Jpeg(tmpf.name, key=sample_key)
info = [str(im.image_width),
str(im.image_height),
str(im.image_width * im.image_height),
str(im.getCapacity()),
str(im.getQuality()),
str(info_rate),
str(tag_chosen),
str(tag_class)]
return info
except Exception as e:
print e
finally:
tmpf.close()
|
f25fd27c
Chunk
staged. 'hbase' m...
|
108
109
110
111
112
113
114
|
def _get_feat(self, image, feattype='ibd', **kwargs):
size = kwargs.get('size', (48, 48))
if feattype == 'hog':
feater = HOG.FeatHOG(size=size)
elif feattype == 'ibd':
feater = IntraBlockDiff.FeatIntraBlockDiff()
|
1c2a3fa0
Chunk
staged.
|
115
|
else:
|
f25fd27c
Chunk
staged. 'hbase' m...
|
116
117
118
|
raise Exception("Unknown feature type!")
desc = feater.feat(image)
|
24768a99
Chunk
mode 'hbase' fini...
|
119
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
120
|
return desc
|
1c2a3fa0
Chunk
staged.
|
121
122
123
124
125
126
127
128
129
130
|
def _extract_data(self, mode='hbase', writeback=False):
"""
Get info barely out of image data.
"""
if mode == 'hbase':
if self.table == None:
self.table = self.get_table()
cols = ['cf_pic:data']
|
f25fd27c
Chunk
staged. 'hbase' m...
|
131
132
133
134
135
136
137
|
for key, data in self.table.scan(columns=cols):
data = data['cf_pic:data']
self.dict_data[key] = [data] + self._get_info(data)
if not writeback:
return self.dict_data
else:
|
84648488
Chunk
reverted.
|
138
139
140
141
142
|
try:
with self.table.batch(batch_size=5000) as b:
for imgname, imginfo in self.dict_data.items():
b.put(imgname,
{
|
f25fd27c
Chunk
staged. 'hbase' m...
|
143
144
145
|
# 'cf_pic:data': imginfo[0],
'cf_info:width': imginfo[1],
'cf_info:height': imginfo[2],
|
ea1eb31a
Chunk
spark is privileg...
|
146
|
'cf_info:size': imginfo[3],
|
f25fd27c
Chunk
staged. 'hbase' m...
|
147
|
'cf_info:capacity': imginfo[4],
|
ea1eb31a
Chunk
spark is privileg...
|
148
|
'cf_info:quality': imginfo[5],
|
f25fd27c
Chunk
staged. 'hbase' m...
|
149
|
'cf_info:rate': imginfo[6],
|
ea1eb31a
Chunk
spark is privileg...
|
150
|
'cf_tag:chosen': imginfo[7],
|
84648488
Chunk
reverted.
|
151
|
'cf_tag:class': imginfo[8],
|
1c2a3fa0
Chunk
staged.
|
152
|
})
|
0fbc087e
Chunk
staged.
|
153
154
155
156
157
158
|
except ValueError:
raise
elif mode == 'spark':
pass
|
0fbc087e
Chunk
staged.
|
159
|
else:
|
1c2a3fa0
Chunk
staged.
|
160
161
162
163
164
|
raise Exception("Unknown mode!")
def _embed_data(self, mode='hbase', rate=None, readforward=False, writeback=False):
f5 = F5.F5(sample_key, 1)
|
0fbc087e
Chunk
staged.
|
165
166
|
if mode == 'hbase':
if self.table == None:
|
84648488
Chunk
reverted.
|
167
|
self.table = self.get_table()
|
1c2a3fa0
Chunk
staged.
|
168
|
|
0fbc087e
Chunk
staged.
|
169
|
if readforward:
|
1c2a3fa0
Chunk
staged.
|
170
|
self.dict_data = {}
|
84648488
Chunk
reverted.
|
171
|
cols = ['cf_pic:data',
|
0fbc087e
Chunk
staged.
|
172
173
|
'cf_info:width',
'cf_info:height',
|
84648488
Chunk
reverted.
|
174
|
'cf_info:size',
|
0fbc087e
Chunk
staged.
|
175
176
177
178
179
180
181
182
183
|
'cf_info:capacity',
'cf_info:quality',
'cf_info:rate',
'cf_tag:chosen',
'cf_tag:class']
for key, data in self.table.scan(columns=cols):
data = [data[k] for k in cols]
self.dict_data[key] = data
|
1c2a3fa0
Chunk
staged.
|
184
|
dict_data_ext = {}
|
0fbc087e
Chunk
staged.
|
185
186
187
188
189
190
191
192
193
|
for imgname, imgdata in self.dict_data.items():
try:
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf_src.write(imgdata[0])
tmpf_src.seek(0)
tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
if rate == None:
|
84648488
Chunk
reverted.
|
194
|
embed_rate = f5.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'),
|
0fbc087e
Chunk
staged.
|
195
196
197
198
199
|
tmpf_dst.name)
else:
assert (rate >= 0 and rate < 1)
# print capacity
hidden = np.random.bytes(int(int(imgdata[4]) * rate) / 8)
|
84648488
Chunk
reverted.
|
200
|
embed_rate = f5.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
|
0fbc087e
Chunk
staged.
|
201
202
203
204
205
206
207
208
209
210
211
212
213
214
|
tmpf_dst.seek(0)
raw = tmpf_dst.read()
index = md5(raw).hexdigest()
dict_data_ext[index + '.jpg'] = [raw] + self._get_info(raw, embed_rate, 0, 1)
except Exception as e:
print e
raise
finally:
tmpf_src.close()
tmpf_dst.close()
|
84648488
Chunk
reverted.
|
215
|
self.dict_data.update(dict_data_ext)
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
216
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
217
218
219
|
if not writeback:
return self.dict_data
else:
|
f1fa5b17
Chunk
review & streaming.
|
220
|
try:
|
f25fd27c
Chunk
staged. 'hbase' m...
|
221
222
223
224
225
|
with self.table.batch(batch_size=5000) as b:
for imgname, imginfo in dict_data_ext.items():
b.put(imgname,
{
'cf_pic:data': imginfo[0],
|
24768a99
Chunk
mode 'hbase' fini...
|
226
227
|
'cf_info:width': imginfo[1],
'cf_info:height': imginfo[2],
|
f25fd27c
Chunk
staged. 'hbase' m...
|
228
229
230
231
232
233
234
235
236
237
238
|
'cf_info:size': imginfo[3],
'cf_info:capacity': imginfo[4],
'cf_info:quality': imginfo[5],
'cf_info:rate': imginfo[6],
'cf_tag:chosen': imginfo[7],
'cf_tag:class': imginfo[8], })
except ValueError:
raise
elif mode == 'spark':
pass
|
1c2a3fa0
Chunk
staged.
|
239
240
241
242
243
244
245
246
|
else:
raise Exception("Unknown mode!")
def _extract_feat(self, mode='hbase', feattype='ibd', readforward=False, writeback=False, **kwargs):
if mode == 'hbase':
if self.table == None:
self.table = self.get_table()
|
24768a99
Chunk
mode 'hbase' fini...
|
247
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
248
249
250
251
252
|
if readforward:
self.dict_data = {}
cols = ['cf_pic:data',
'cf_info:width',
'cf_info:height',
|
02528074
Chunk
staged.
|
253
254
|
'cf_info:size',
'cf_info:capacity',
|
0bd44a28
Chunk
staged.
|
255
|
'cf_info:quality',
|
0fbc087e
Chunk
staged.
|
256
|
'cf_info:rate',
|
1c2a3fa0
Chunk
staged.
|
257
258
259
260
261
262
263
264
265
266
267
|
'cf_tag:chosen',
'cf_tag:class']
for key, data in self.table.scan(columns=cols):
data = [data[k] for k in cols]
self.dict_data[key] = data
for imgname, imgdata in self.dict_data.items():
try:
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf_src.write(imgdata[0])
tmpf_src.seek(0)
|
0fbc087e
Chunk
staged.
|
268
|
|
3b4e250d
Chunk
staged.
|
269
|
desc = json.dumps(self._get_feat(tmpf_src.name, feattype=feattype).tolist())
|
02528074
Chunk
staged.
|
270
|
|
1c2a3fa0
Chunk
staged.
|
271
272
273
274
|
self.dict_data[imgname].append(desc)
except Exception as e:
print e
|
3b4e250d
Chunk
staged.
|
275
276
|
raise
finally:
|
02528074
Chunk
staged.
|
277
|
tmpf_src.close()
|
0bd44a28
Chunk
staged.
|
278
|
|
1c2a3fa0
Chunk
staged.
|
279
|
if not writeback:
|
3b4e250d
Chunk
staged.
|
280
|
return self.dict_data
|
0fbc087e
Chunk
staged.
|
281
282
283
|
else:
try:
with self.table.batch(batch_size=5000) as b:
|
02528074
Chunk
staged.
|
284
|
for imgname, imginfo in self.dict_data.items():
|
0bd44a28
Chunk
staged.
|
285
|
b.put(imgname,
|
e3e7e73a
Chunk
spider standalone...
|
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
|
{
'cf_pic:data': imginfo[0],
'cf_info:width': imginfo[1],
'cf_info:height': imginfo[2],
'cf_info:size': imginfo[3],
'cf_info:capacity': imginfo[4],
'cf_info:quality': imginfo[5],
'cf_info:rate': imginfo[6],
'cf_tag:chosen': imginfo[7],
'cf_tag:class': imginfo[8],
'cf_feat:' + feattype: imginfo[9]})
except ValueError:
raise
elif mode == 'spark':
pass
else:
raise Exception("Unknown mode!")
def format(self):
self._extract_data(mode='hbase', writeback=False)
self._embed_data(mode='hbase', rate=0.1, readforward=False, writeback=False)
self._extract_feat(mode='hbase', feattype='ibd', readforward=False, writeback=True)
|
0fbc087e
Chunk
staged.
|
312
|
def load_data(self, mode='local', feattype='ibd', tagtype='class'):
|
ea1eb31a
Chunk
spark is privileg...
|
313
|
INDEX = []
|
f25fd27c
Chunk
staged. 'hbase' m...
|
314
|
X = []
|
ea1eb31a
Chunk
spark is privileg...
|
315
|
Y = []
|
84648488
Chunk
reverted.
|
316
317
|
if mode == "local":
|
f1fa5b17
Chunk
review & streaming.
|
318
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
319
320
321
322
323
324
|
dict_dataset = {}
with open(self.list_file, 'rb') as tsvfile:
tsvfile = csv.reader(tsvfile, delimiter='\t')
for line in tsvfile:
hash = line[0]
|
1c2a3fa0
Chunk
staged.
|
325
326
327
328
329
330
331
332
333
334
335
336
337
|
tag = line[-1]
path_feat = os.path.join(self.feat_dir, hash[:3], hash[3:] + '.' + feattype)
if path_feat:
with open(path_feat, 'rb') as featfile:
dict_dataset[hash] = (tag, json.loads(featfile.read()))
for tag, feat in dict_dataset.values():
X.append([item for sublist in feat for subsublist in sublist for item in subsublist])
Y.append(int(tag))
elif mode == "remote" or mode == "hbase":
if self.table == None:
self.table = self.get_table()
|
24768a99
Chunk
mode 'hbase' fini...
|
338
339
|
col_feat, col_tag = 'cf_feat:' + feattype, 'cf_tag:' + tagtype
|
f25fd27c
Chunk
staged. 'hbase' m...
|
340
341
342
343
344
|
for key, data in self.table.scan(columns=[col_feat, col_tag]):
X.append(json.loads(data[col_feat]))
Y.append(1 if data[col_tag] == 'True' else 0)
elif mode == "spark" or mode == "cluster":
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
345
346
347
348
349
|
if self.sparkcontex == None:
self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077')
result = self.sparkcontex.read_hbase(self.table_name) # result = {key:[feat,tag],...}
for feat, tag in result:
|
f25fd27c
Chunk
staged. 'hbase' m...
|
350
351
352
|
X.append(feat)
Y.append(tag)
|
24768a99
Chunk
mode 'hbase' fini...
|
353
|
else:
|
f25fd27c
Chunk
staged. 'hbase' m...
|
354
355
356
|
raise Exception("Unknown mode!")
return X, Y
|
0fbc087e
Chunk
staged.
|
|
|
84648488
Chunk
reverted.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
|
|
84648488
Chunk
reverted.
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
24768a99
Chunk
mode 'hbase' fini...
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
02528074
Chunk
staged.
|
|
|
0bd44a28
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
84648488
Chunk
reverted.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
0bd44a28
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
84648488
Chunk
reverted.
|
|
|
f1fa5b17
Chunk
review & streaming.
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
24768a99
Chunk
mode 'hbase' fini...
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
24768a99
Chunk
mode 'hbase' fini...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
24768a99
Chunk
mode 'hbase' fini...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
24768a99
Chunk
mode 'hbase' fini...
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
02528074
Chunk
staged.
|
|
|
0bd44a28
Chunk
staged.
|
|
|
2c507774
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
2c507774
Chunk
staged.
|
|
|
84648488
Chunk
reverted.
|
|
|
2c507774
Chunk
staged.
|
|
|
e3e7e73a
Chunk
spider standalone...
|
|
|
8bddd8b3
Chunk
You guess what? T...
|
|
|
2c507774
Chunk
staged.
|
|
|
1c2a3fa0
Chunk
staged.
|
|
|
2c507774
Chunk
staged.
|
|
|
f1fa5b17
Chunk
review & streaming.
|
|
|
02528074
Chunk
staged.
|
|
|
0bd44a28
Chunk
staged.
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
e3e7e73a
Chunk
spider standalone...
|
|
|
e3e7e73a
Chunk
spider standalone...
|
|
|
84648488
Chunk
reverted.
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
f1fa5b17
Chunk
review & streaming.
|
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
84648488
Chunk
reverted.
|
|
|
02528074
Chunk
staged.
|
|
|
f1fa5b17
Chunk
review & streaming.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0bd44a28
Chunk
staged.
|
|
|
e3e7e73a
Chunk
spider standalone...
|
|
|
0bd44a28
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0bd44a28
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0bd44a28
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
02528074
Chunk
staged.
|
|
|
0bd44a28
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
84648488
Chunk
reverted.
|
|
|
02528074
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
02528074
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
84648488
Chunk
reverted.
|
|
|