ea1eb31a
Chunk
spark is privileg...
|
1
2
3
|
__author__ = 'chunk'
from . import *
|
84648488
Chunk
reverted.
|
4
|
from ..mfeat import HOG, IntraBlockDiff
|
ea1eb31a
Chunk
spark is privileg...
|
5
|
from ..mspark import SC
|
02528074
Chunk
staged.
|
6
|
from pyspark.mllib.regression import LabeledPoint
|
ea1eb31a
Chunk
spark is privileg...
|
7
8
9
|
from ..common import *
import os, sys
|
ea1eb31a
Chunk
spark is privileg...
|
10
11
|
from hashlib import md5
import csv
|
ea1eb31a
Chunk
spark is privileg...
|
12
|
import json
|
ea1eb31a
Chunk
spark is privileg...
|
13
14
15
16
17
18
19
|
import happybase
from ..mjpeg import *
from ..msteg import *
from ..msteg.steganography import LSB, F3, F4, F5
import numpy as np
|
ea1eb31a
Chunk
spark is privileg...
|
20
21
|
from scipy import stats
|
f25fd27c
Chunk
staged. 'hbase' m...
|
22
|
import tempfile
|
ea1eb31a
Chunk
spark is privileg...
|
23
24
25
26
27
28
|
np.random.seed(sum(map(ord, "whoami")))
package_dir = os.path.dirname(os.path.abspath(__file__))
|
24768a99
Chunk
mode 'hbase' fini...
|
29
|
class DataILSVRC_S(DataDumperBase):
|
f25fd27c
Chunk
staged. 'hbase' m...
|
30
31
32
33
34
35
36
37
38
|
"""
This module is specially for ILSVRC data processing under spark & hbase.
We posit that the DB(e.g. HBase) has only the images data with md5 name as id.
The task is to gennerate info(size,capacity,quality,etc.) and class & chosen tags, and then to perform embedding and finally to calcculate ibd features.
Each step includes reading from & writing to Hbase (though PC).
And each step must have a 'spark' mode option, which means that the operation is performed by spark with reading & wrting through RDDs.
|
35cf2e3a
Chunk
staged.
|
39
|
copyright(c) 2015 chunkplus@gmail.com
|
f25fd27c
Chunk
staged. 'hbase' m...
|
40
41
|
"""
|
4f36b116
Chunk
staged.
|
42
|
def __init__(self, base='ILSVRC2013_DET_val', category='Train_1'):
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
43
|
DataDumperBase.__init__(self, base, category)
|
ea1eb31a
Chunk
spark is privileg...
|
44
|
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
45
|
self.base = base
|
ea1eb31a
Chunk
spark is privileg...
|
46
|
self.category = category
|
ea1eb31a
Chunk
spark is privileg...
|
47
48
|
self.dict_data = {}
|
0fbc087e
Chunk
staged.
|
49
|
self.rdd_data = None
|
ea1eb31a
Chunk
spark is privileg...
|
50
|
|
4f36b116
Chunk
staged.
|
51
52
53
54
55
56
|
self.table_name = self.base.strip('/').split('/')[-1]
if self.category != None:
self.table_name += ('-' + self.category)
self.sparker = None
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
57
|
self.steger = F5.F5(sample_key, 1)
|
f4fb4381
Chunk
staged.
|
58
59
|
def get_table(self):
|
ea1eb31a
Chunk
spark is privileg...
|
60
|
if self.table != None:
|
0fbc087e
Chunk
staged.
|
61
|
return self.table
|
ea1eb31a
Chunk
spark is privileg...
|
62
|
|
24768a99
Chunk
mode 'hbase' fini...
|
63
|
if self.connection is None:
|
4f36b116
Chunk
staged.
|
64
65
|
c = happybase.Connection('HPC-server')
self.connection = c
|
ea1eb31a
Chunk
spark is privileg...
|
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
tables = self.connection.tables()
if self.table_name not in tables:
families = {'cf_pic': dict(),
'cf_info': dict(max_versions=10),
'cf_tag': dict(),
'cf_feat': dict(),
}
self.connection.create_table(name=self.table_name, families=families)
table = self.connection.table(name=self.table_name)
self.table = table
return table
def delete_table(self, table_name=None, disable=True):
if table_name == None:
table_name = self.table_name
if self.connection is None:
c = happybase.Connection('HPC-server')
|
d47ae6ce
Chunk
staged.
|
88
|
self.connection = c
|
f1fa5b17
Chunk
review & streaming.
|
89
|
|
d47ae6ce
Chunk
staged.
|
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
tables = self.connection.tables()
if table_name not in tables:
return False
else:
try:
self.connection.delete_table(table_name, disable)
except:
print 'Exception when deleting table.'
raise
return True
def _get_info(self, img, info_rate=None, tag_chosen=None, tag_class=None):
"""
Tempfile is our friend. (?)
"""
info_rate = info_rate if info_rate != None else 0.0
tag_chosen = tag_chosen if tag_chosen != None else stats.bernoulli.rvs(0.8)
tag_class = tag_class if tag_class != None else 0
|
f25fd27c
Chunk
staged. 'hbase' m...
|
108
109
110
111
112
113
114
|
try:
tmpf = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf.write(img)
tmpf.seek(0)
im = Jpeg(tmpf.name, key=sample_key)
info = [
|
1c2a3fa0
Chunk
staged.
|
115
|
im.image_width,
|
f25fd27c
Chunk
staged. 'hbase' m...
|
116
117
118
|
im.image_height,
im.image_width * im.image_height,
im.getCapacity(),
|
24768a99
Chunk
mode 'hbase' fini...
|
119
|
im.getQuality(),
|
f25fd27c
Chunk
staged. 'hbase' m...
|
120
|
info_rate,
|
1c2a3fa0
Chunk
staged.
|
121
122
123
124
125
126
127
128
129
130
|
tag_chosen,
tag_class
]
return info
except Exception as e:
print e
finally:
tmpf.close()
def _get_feat(self, image, feattype='ibd', **kwargs):
|
f25fd27c
Chunk
staged. 'hbase' m...
|
131
132
133
134
135
136
137
|
size = kwargs.get('size', (48, 48))
if feattype == 'hog':
feater = HOG.FeatHOG(size=size)
elif feattype == 'ibd':
feater = IntraBlockDiff.FeatIntraBlockDiff()
else:
|
84648488
Chunk
reverted.
|
138
139
140
141
142
|
raise Exception("Unknown feature type!")
desc = feater.feat(image)
return desc
|
f25fd27c
Chunk
staged. 'hbase' m...
|
143
144
145
|
def _rddparse_data(raw_row):
|
ea1eb31a
Chunk
spark is privileg...
|
146
|
"""
|
f25fd27c
Chunk
staged. 'hbase' m...
|
147
|
input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
|
ea1eb31a
Chunk
spark is privileg...
|
148
|
return: ([0.056273,...],1)
|
f25fd27c
Chunk
staged. 'hbase' m...
|
149
|
|
ea1eb31a
Chunk
spark is privileg...
|
150
|
In fact we can also use mapValues.
|
84648488
Chunk
reverted.
|
151
|
"""
|
1c2a3fa0
Chunk
staged.
|
152
|
key = raw_row[0]
|
0fbc087e
Chunk
staged.
|
153
154
155
156
157
158
|
# if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
# with open('/tmp/hhhh','wb') as f:
# f.write(raw_row[1].decode('unicode-escape')).encode('latin-1')
items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
data = items[0].split('cf_pic:data:')[-1]
return (key, data)
|
0fbc087e
Chunk
staged.
|
159
|
|
1c2a3fa0
Chunk
staged.
|
160
161
162
163
164
|
def _rddparse_all(raw_row):
key = raw_row[0]
items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in items[1:]]
|
0fbc087e
Chunk
staged.
|
165
166
|
return (key, data)
|
84648488
Chunk
reverted.
|
167
|
|
1c2a3fa0
Chunk
staged.
|
168
|
def _rdd_embed(self, row):
|
0fbc087e
Chunk
staged.
|
169
|
"""
|
1c2a3fa0
Chunk
staged.
|
170
|
input:
|
84648488
Chunk
reverted.
|
171
|
e.g. row =('row1',[1,3400,'hello'])
|
0fbc087e
Chunk
staged.
|
172
173
|
return:
newrow = ('row2',[34,5400,'embeded'])
|
84648488
Chunk
reverted.
|
174
|
"""
|
0fbc087e
Chunk
staged.
|
175
176
177
178
179
180
181
182
183
|
items = row[1]
capacity, rate, chosen = items[4], items[6], items[7]
if chosen == 0:
return None
try:
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf_src.write(items[0])
tmpf_src.seek(0)
|
1c2a3fa0
Chunk
staged.
|
184
|
tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
|
0fbc087e
Chunk
staged.
|
185
186
187
188
189
190
191
192
193
|
if rate == None:
embed_rate = self.steger.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'),
tmpf_dst.name)
else:
assert (rate >= 0 and rate < 1)
# print capacity
hidden = np.random.bytes(int(int(capacity) * rate) / 8)
embed_rate = self.steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
|
84648488
Chunk
reverted.
|
194
|
|
0fbc087e
Chunk
staged.
|
195
196
197
198
199
|
tmpf_dst.seek(0)
raw = tmpf_dst.read()
index = md5(raw).hexdigest()
return (index + '.jpg', [raw] + self._get_info(raw, embed_rate, 0, 1))
|
84648488
Chunk
reverted.
|
200
|
|
0fbc087e
Chunk
staged.
|
201
202
203
204
205
206
207
208
209
210
211
212
213
214
|
except Exception as e:
print e
raise
finally:
tmpf_src.close()
tmpf_dst.close()
def _extract_data(self, mode='hbase', writeback=False, withdata=True):
"""
Get info barely out of image data.
"""
if mode == 'hbase':
if self.table == None:
|
84648488
Chunk
reverted.
|
215
|
self.table = self.get_table()
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
216
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
217
218
219
|
cols = ['cf_pic:data']
for key, data in self.table.scan(columns=cols):
data = data['cf_pic:data']
|
f1fa5b17
Chunk
review & streaming.
|
220
|
self.dict_data[key] = [data] + self._get_info(data)
|
f25fd27c
Chunk
staged. 'hbase' m...
|
221
222
223
224
225
|
if not writeback:
return self.dict_data
else:
try:
|
24768a99
Chunk
mode 'hbase' fini...
|
226
227
|
with self.table.batch(batch_size=5000) as b:
for imgname, imginfo in self.dict_data.items():
|
f25fd27c
Chunk
staged. 'hbase' m...
|
228
229
230
231
232
233
234
235
236
237
238
|
b.put(imgname,
{
# 'cf_pic:data': imginfo[0],
'cf_info:width': str(imginfo[1]),
'cf_info:height': str(imginfo[2]),
'cf_info:size': str(imginfo[3]),
'cf_info:capacity': str(imginfo[4]),
'cf_info:quality': str(imginfo[5]),
'cf_info:rate': str(imginfo[6]),
'cf_tag:chosen': str(imginfo[7]),
'cf_tag:class': str(imginfo[8]),
|
1c2a3fa0
Chunk
staged.
|
239
240
241
242
243
244
245
246
|
})
except ValueError:
raise
elif mode == 'spark':
if self.sparker == None:
self.sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S',
|
24768a99
Chunk
mode 'hbase' fini...
|
247
|
master='spark://HPC-server:7077')
|
f25fd27c
Chunk
staged. 'hbase' m...
|
248
249
250
251
252
|
cols = [
'cf_pic:data',
'cf_info:width',
'cf_info:height',
|
02528074
Chunk
staged.
|
253
254
|
'cf_info:size',
'cf_info:capacity',
|
0bd44a28
Chunk
staged.
|
255
|
'cf_info:quality',
|
0fbc087e
Chunk
staged.
|
256
|
'cf_info:rate',
|
1c2a3fa0
Chunk
staged.
|
257
258
259
260
261
262
263
264
265
266
267
|
'cf_tag:chosen',
'cf_tag:class'
]
# # Debug
# tmp_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_data_ILS,
# collect=False)
# # tmp_data = tmp_data.mapValues(lambda data: [data] + SC.rddinfo_ILS(data))
# print tmp_data.collect()[0][1]
# return
|
0fbc087e
Chunk
staged.
|
268
|
|
3b4e250d
Chunk
staged.
|
269
|
self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_data_ILS,
|
02528074
Chunk
staged.
|
270
|
collect=False).mapValues(
|
1c2a3fa0
Chunk
staged.
|
271
272
273
274
|
lambda data: [data] + SC.rddinfo_ILS(data))
if not writeback:
return self.rdd_data
|
3b4e250d
Chunk
staged.
|
275
276
|
else:
self.sparker.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols,
|
02528074
Chunk
staged.
|
277
|
withdata=withdata)
|
0bd44a28
Chunk
staged.
|
278
|
|
1c2a3fa0
Chunk
staged.
|
279
|
else:
|
3b4e250d
Chunk
staged.
|
280
|
raise Exception("Unknown mode!")
|
0fbc087e
Chunk
staged.
|
281
282
283
|
def _embed_data(self, mode='hbase', rate=None, readforward=False, writeback=False, withdata=True):
|
02528074
Chunk
staged.
|
284
|
if mode == 'hbase':
|
0bd44a28
Chunk
staged.
|
285
|
if self.table == None:
|
e3e7e73a
Chunk
spider standalone...
|
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
|
self.table = self.get_table()
if readforward:
self.dict_data = {}
cols = [
'cf_pic:data',
'cf_info:width',
'cf_info:height',
'cf_info:size',
'cf_info:capacity',
'cf_info:quality',
'cf_info:rate',
'cf_tag:chosen',
'cf_tag:class'
]
for key, data in self.table.scan(columns=cols):
data = [data[k] for k in cols]
self.dict_data[key] = data
dict_data_ext = {}
for imgname, imgdata in self.dict_data.items():
capacity, chosen = int(imgdata[4]), int(imgdata[7])
|
0fbc087e
Chunk
staged.
|
312
|
if chosen == 0:
|
ea1eb31a
Chunk
spark is privileg...
|
313
|
continue
|
f25fd27c
Chunk
staged. 'hbase' m...
|
314
|
|
ea1eb31a
Chunk
spark is privileg...
|
315
|
try:
|
84648488
Chunk
reverted.
|
316
317
|
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf_src.write(imgdata[0])
|
f1fa5b17
Chunk
review & streaming.
|
318
|
tmpf_src.seek(0)
|
f25fd27c
Chunk
staged. 'hbase' m...
|
319
320
321
322
323
324
|
tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
if rate == None:
embed_rate = self.steger.embed_raw_data(tmpf_src.name,
os.path.join(package_dir, '../res/toembed'),
tmpf_dst.name)
|
1c2a3fa0
Chunk
staged.
|
325
326
327
328
329
330
331
332
333
334
335
336
337
|
else:
assert (rate >= 0 and rate < 1)
# print capacity
hidden = np.random.bytes(int(capacity * rate) / 8)
embed_rate = self.steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
tmpf_dst.seek(0)
raw = tmpf_dst.read()
index = md5(raw).hexdigest()
dict_data_ext[index + '.jpg'] = [raw] + self._get_info(raw, embed_rate, 0, 1)
except Exception as e:
|
24768a99
Chunk
mode 'hbase' fini...
|
338
339
|
print e
raise
|
f25fd27c
Chunk
staged. 'hbase' m...
|
340
341
342
343
344
|
finally:
tmpf_src.close()
tmpf_dst.close()
self.dict_data.update(dict_data_ext)
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
345
346
347
348
349
|
if not writeback:
return self.dict_data
else:
try:
|
f25fd27c
Chunk
staged. 'hbase' m...
|
350
351
352
|
with self.table.batch(batch_size=5000) as b:
for imgname, imginfo in dict_data_ext.items():
b.put(imgname,
|
24768a99
Chunk
mode 'hbase' fini...
|
353
|
{
|
f25fd27c
Chunk
staged. 'hbase' m...
|
354
355
356
|
'cf_pic:data': imginfo[0],
'cf_info:width': str(imginfo[1]),
'cf_info:height': str(imginfo[2]),
|
0fbc087e
Chunk
staged.
|
357
|
'cf_info:size': str(imginfo[3]),
|
84648488
Chunk
reverted.
|
358
|
'cf_info:capacity': str(imginfo[4]),
|
0fbc087e
Chunk
staged.
|
359
|
'cf_info:quality': str(imginfo[5]),
|
f25fd27c
Chunk
staged. 'hbase' m...
|
360
361
362
|
'cf_info:rate': str(imginfo[6]),
'cf_tag:chosen': str(imginfo[7]),
'cf_tag:class': str(imginfo[8]),
|
1dc7c44b
Chunk
crawler-hbase-spa...
|
363
|
})
|
84648488
Chunk
reverted.
|
364
|
except ValueError:
|
f25fd27c
Chunk
staged. 'hbase' m...
|
365
366
367
368
369
|
raise
elif mode == 'spark':
if self.sparker == None:
self.sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S',
|
ea1eb31a
Chunk
spark is privileg...
|
370
|
master='spark://HPC-server:7077')
|
ea1eb31a
Chunk
spark is privileg...
|
371
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
372
373
|
cols = [
'cf_pic:data',
|
24768a99
Chunk
mode 'hbase' fini...
|
374
|
'cf_info:width',
|
f25fd27c
Chunk
staged. 'hbase' m...
|
375
376
377
|
'cf_info:height',
'cf_info:size',
'cf_info:capacity',
|
ea1eb31a
Chunk
spark is privileg...
|
378
|
'cf_info:quality',
|
f25fd27c
Chunk
staged. 'hbase' m...
|
379
380
381
382
383
384
385
386
387
388
389
|
'cf_info:rate',
'cf_tag:chosen',
'cf_tag:class'
]
if readforward:
self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False)
# rdd_data_ext = self.rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=rate)).filter(lambda x: x != None)
# self.rdd_data = self.rdd_data.union(rdd_data_ext)
|
1c2a3fa0
Chunk
staged.
|
390
391
392
393
394
395
396
397
398
|
self.rdd_data = self.rdd_data.flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=rate))
if not writeback:
return self.rdd_data
else:
self.sparker.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols,
withdata=withdata)
else:
raise Exception("Unknown mode!")
|
f25fd27c
Chunk
staged. 'hbase' m...
|
399
400
401
402
|
def _extract_feat(self, mode='hbase', feattype='ibd', readforward=False, writeback=False, withdata=False):
if mode == 'hbase':
|
02528074
Chunk
staged.
|
403
404
|
if self.table == None:
self.table = self.get_table()
|
0bd44a28
Chunk
staged.
|
405
|
|
0fbc087e
Chunk
staged.
|
406
|
if readforward:
|
1c2a3fa0
Chunk
staged.
|
407
408
409
410
411
412
413
414
415
416
417
|
self.dict_data = {}
cols = [
'cf_pic:data',
'cf_info:width',
'cf_info:height',
'cf_info:size',
'cf_info:capacity',
'cf_info:quality',
'cf_info:rate',
'cf_tag:chosen',
'cf_tag:class'
|
0fbc087e
Chunk
staged.
|
418
419
|
]
for key, data in self.table.scan(columns=cols):
|
84648488
Chunk
reverted.
|
420
|
data = [data[k] for k in cols]
|
0fbc087e
Chunk
staged.
|
421
|
self.dict_data[key] = data
|
489c5608
Chunk
debugging...
|
422
423
|
for imgname, imgdata in self.dict_data.items():
|
0fbc087e
Chunk
staged.
|
424
|
try:
|
489c5608
Chunk
debugging...
|
425
|
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
|
0fbc087e
Chunk
staged.
|
426
|
tmpf_src.write(imgdata[0])
|
1c2a3fa0
Chunk
staged.
|
427
|
tmpf_src.seek(0)
|
0fbc087e
Chunk
staged.
|
428
|
|
02528074
Chunk
staged.
|
429
|
desc = json.dumps(self._get_feat(tmpf_src.name, feattype=feattype).tolist())
|
0bd44a28
Chunk
staged.
|
430
|
|
0fbc087e
Chunk
staged.
|
431
|
self.dict_data[imgname].append(desc)
|
ea1eb31a
Chunk
spark is privileg...
|
432
|
|
f25fd27c
Chunk
staged. 'hbase' m...
|
433
434
|
except Exception as e:
print e
|
84648488
Chunk
reverted.
|
435
436
|
raise
finally:
|
f1fa5b17
Chunk
review & streaming.
|
437
|
tmpf_src.close()
|
f25fd27c
Chunk
staged. 'hbase' m...
|
438
439
440
441
442
443
|
if not writeback:
return self.dict_data
else:
try:
with self.table.batch(batch_size=5000) as b:
|
1c2a3fa0
Chunk
staged.
|
444
445
446
447
448
449
450
451
452
453
454
|
for imgname, imginfo in self.dict_data.items():
b.put(imgname,
{
'cf_pic:data': imginfo[0],
'cf_info:width': str(imginfo[1]),
'cf_info:height': str(imginfo[2]),
'cf_info:size': str(imginfo[3]),
'cf_info:capacity': str(imginfo[4]),
'cf_info:quality': str(imginfo[5]),
'cf_info:rate': str(imginfo[6]),
'cf_tag:chosen': str(imginfo[7]),
|
24768a99
Chunk
mode 'hbase' fini...
|
455
456
|
'cf_tag:class': str(imginfo[8]),
'cf_feat:' + feattype: imginfo[9],
|
f25fd27c
Chunk
staged. 'hbase' m...
|
457
458
459
460
461
462
|
})
except ValueError:
raise
elif mode == 'spark':
if self.sparker == None:
|
24768a99
Chunk
mode 'hbase' fini...
|
463
|
self.sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S',
|
ea1eb31a
Chunk
spark is privileg...
|
464
|
master='spark://HPC-server:7077')
|
24768a99
Chunk
mode 'hbase' fini...
|
465
|
|
ea1eb31a
Chunk
spark is privileg...
|
466
|
cols = [
|
f25fd27c
Chunk
staged. 'hbase' m...
|
467
|
'cf_pic:data',
|
ea1eb31a
Chunk
spark is privileg...
|
468
|
'cf_info:width',
|
f25fd27c
Chunk
staged. 'hbase' m...
|
469
470
|
'cf_info:height',
'cf_info:size',
|
24768a99
Chunk
mode 'hbase' fini...
|
471
|
'cf_info:capacity',
|
f25fd27c
Chunk
staged. 'hbase' m...
|
472
473
|
'cf_info:quality',
'cf_info:rate',
|
ea1eb31a
Chunk
spark is privileg...
|
474
|
'cf_tag:chosen',
|
f25fd27c
Chunk
staged. 'hbase' m...
|
475
476
477
478
479
480
481
482
483
|
'cf_tag:class',
'cf_feat:' + feattype,
]
if readforward:
self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False)
self.rdd_data = self.rdd_data.mapValues(lambda items: SC.rddfeat_ILS(items))
|
1c2a3fa0
Chunk
staged.
|
484
485
486
487
488
489
490
491
492
493
|
# print self.rdd_data.collect()[0]
# return
if not writeback:
return self.rdd_data
else:
self.sparker.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols,
withdata=withdata)
|
f25fd27c
Chunk
staged. 'hbase' m...
|
494
495
496
497
|
else:
raise Exception("Unknown mode!")
|
02528074
Chunk
staged.
|
498
499
|
def format(self):
self._extract_data(mode='hbase', writeback=False)
|
0bd44a28
Chunk
staged.
|
500
|
self._embed_data(mode='hbase', rate=0.1, readforward=False, writeback=False)
|
2c507774
Chunk
staged.
|
501
|
self._extract_feat(mode='hbase', feattype='ibd', readforward=False, writeback=True)
|
1c2a3fa0
Chunk
staged.
|
502
503
504
505
506
507
508
509
510
511
512
513
|
def load_data(self, mode='hbase', feattype='ibd', tagtype='class', collect=False):
INDEX = []
X = []
Y = []
if mode == "local":
dict_dataset = {}
if feattype == 'coef': # raw
with open(self.list_file, 'rb') as tsvfile:
tsvfile = csv.reader(tsvfile, delimiter='\t')
|
2c507774
Chunk
staged.
|
514
515
|
for line in tsvfile:
hash = line[0]
|
84648488
Chunk
reverted.
|
516
|
tag = line[-1]
|
2c507774
Chunk
staged.
|
517
|
image = os.path.join(self.img_dir, hash[:3], hash[3:] + '.jpg')
|
e3e7e73a
Chunk
spider standalone...
|
518
|
if image:
|
8bddd8b3
Chunk
You guess what? T...
|
519
520
521
|
im = Jpeg(image, key=sample_key)
dict_dataset[hash] = (tag, im.getCoefMatrix(channel='Y'))
|
2c507774
Chunk
staged.
|
522
523
|
for tag, feat in dict_dataset.values():
feat.ravel()[[i*304+j for i in range(0,304,8) for j in range(0,304,8)]] = 0
|
1c2a3fa0
Chunk
staged.
|
524
|
X.append(feat.tolist())
|
2c507774
Chunk
staged.
|
525
|
Y.append(int(tag))
|
f1fa5b17
Chunk
review & streaming.
|
526
|
|
02528074
Chunk
staged.
|
527
|
else:
|
0bd44a28
Chunk
staged.
|
528
|
with open(self.list_file, 'rb') as tsvfile:
|
f25fd27c
Chunk
staged. 'hbase' m...
|
529
530
531
|
tsvfile = csv.reader(tsvfile, delimiter='\t')
for line in tsvfile:
hash = line[0]
|
e3ec1f74
Chunk
staged.
|
532
|
tag = line[-1]
|
e3e7e73a
Chunk
spider standalone...
|
533
534
535
536
537
538
539
|
path_feat = os.path.join(self.feat_dir, hash[:3], hash[3:] + '.' + feattype)
if path_feat:
with open(path_feat, 'rb') as featfile:
dict_dataset[hash] = (tag, json.loads(featfile.read()))
for tag, feat in dict_dataset.values():
# X.append([item for sublist in feat for subsublist in sublist for item in subsublist])
|
e3e7e73a
Chunk
spider standalone...
|
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
|
X.append(np.array(feat).ravel().tolist())
Y.append(int(tag))
elif mode == "hbase":
if self.table == None:
self.table = self.get_table()
col_feat, col_tag = 'cf_feat:' + feattype, 'cf_tag:' + tagtype
for key, data in self.table.scan(columns=[col_feat, col_tag]):
# X.append(
# [item for sublist in json.loads(data[col_feat]) for subsublist in sublist for item in subsublist])
X.append(np.array(json.loads(data[col_feat])).ravel().tolist())
Y.append(int(data[col_tag]))
elif mode == "spark" or mode == "cluster":
if self.sparker == None:
self.sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S',
master='spark://HPC-server:7077')
rdd_dataset = self.sparker.read_hbase(self.table_name, func=SC.rddparse_dataset_ILS, collect=False)
|
84648488
Chunk
reverted.
|
560
|
if not collect:
|
f25fd27c
Chunk
staged. 'hbase' m...
|
561
|
rdd_dataset = rdd_dataset.map(lambda x: LabeledPoint(x[0], x[1]))
|
f1fa5b17
Chunk
review & streaming.
|
562
|
return rdd_dataset
|
f25fd27c
Chunk
staged. 'hbase' m...
|
563
564
565
|
for tag, feat in rdd_dataset.collect():
X.append(feat)
|
ea1eb31a
Chunk
spark is privileg...
|
566
|
Y.append(tag)
|
84648488
Chunk
reverted.
|
567
|
else:
|
02528074
Chunk
staged.
|
568
|
raise Exception("Unknown mode!")
|
f1fa5b17
Chunk
review & streaming.
|
569
|
|
ea1eb31a
Chunk
spark is privileg...
|
570
|
return X, Y
|
0bd44a28
Chunk
staged.
|
|
|
e3e7e73a
Chunk
spider standalone...
|
|
|
0bd44a28
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0bd44a28
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0bd44a28
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
02528074
Chunk
staged.
|
|
|
0bd44a28
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
84648488
Chunk
reverted.
|
|
|
02528074
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
02528074
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
84648488
Chunk
reverted.
|
|
|