3b4e250d
Chunk
staged.
|
1
|
# -*- coding: utf-8 -*-
|
a9c10957
Chunk
hbase-svm & spark...
|
2
3
|
__author__ = 'chunk'
|
ca73c96f
Chunk
Transformed into ...
|
4
|
from ..common import *
|
f69baeb6
Chunk
spark streaming ...
|
5
6
|
from .dependencies import *
from . import *
|
3b4e250d
Chunk
staged.
|
7
8
9
10
11
|
# from ..mdata import MSR, CV, ILSVRC, ILSVRC_S
from ..mjpeg import *
from ..msteg import *
from ..msteg.steganography import LSB, F3, F4, F5
|
1c2a3fa0
Chunk
staged.
|
12
|
from ..mfeat import IntraBlockDiff
|
e3e7e73a
Chunk
spider standalone...
|
13
|
|
ca73c96f
Chunk
Transformed into ...
|
14
|
import sys
|
a9c10957
Chunk
hbase-svm & spark...
|
15
|
from pyspark import RDD
|
02528074
Chunk
staged.
|
16
|
from pyspark import SparkConf, SparkContext
|
a9c10957
Chunk
hbase-svm & spark...
|
17
18
19
20
|
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array
import json
|
5ec38adb
Chunk
spark-local of da...
|
21
22
|
import pickle
import tempfile
|
3b4e250d
Chunk
staged.
|
23
24
25
|
import numpy as np
from scipy import stats
|
3b4e250d
Chunk
staged.
|
26
27
28
|
from hashlib import md5
np.random.seed(sum(map(ord, "whoami")))
|
1c2a3fa0
Chunk
staged.
|
29
|
package_dir = os.path.dirname(os.path.abspath(__file__))
|
3b4e250d
Chunk
staged.
|
30
|
|
e3e7e73a
Chunk
spider standalone...
|
31
|
|
3b4e250d
Chunk
staged.
|
32
|
def rddparse_data_CV(raw_row):
|
e3ec1f74
Chunk
staged.
|
33
|
"""
|
3b4e250d
Chunk
staged.
|
34
|
input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
|
5ec38adb
Chunk
spark-local of da...
|
35
36
37
38
39
40
41
42
43
44
|
return: ([0.056273,...],1)
"""
data = raw_row[1].split('--%--')
feat = json.loads(data[0].split(':')[-1])
tag = 1 if data[-1].split(':')[-1] == 'True' else 0
return (feat, tag)
def rddparse_data_ILS(raw_row):
"""
|
3b4e250d
Chunk
staged.
|
45
46
47
48
49
50
51
|
input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
return: ([0.056273,...],1)
In fact we can also use mapValues.
"""
key = raw_row[0]
# if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
|
3b4e250d
Chunk
staged.
|
52
|
# with open('/tmp/hhhh','wb') as f:
|
1c2a3fa0
Chunk
staged.
|
53
54
55
56
|
# f.write(raw_row[1].decode('unicode-escape')).encode('latin-1')
items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
data = items[0].split('cf_pic:data:')[-1]
return (key, data)
|
3b4e250d
Chunk
staged.
|
57
58
59
60
61
|
def rddparse_all_ILS(raw_row):
"""
Deprecated
|
8bddd8b3
Chunk
You guess what? T...
|
62
63
64
|
"""
key = raw_row[0]
items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
|
3b4e250d
Chunk
staged.
|
65
|
|
1c2a3fa0
Chunk
staged.
|
66
|
# @TODO
|
8bddd8b3
Chunk
You guess what? T...
|
67
68
69
|
# N.B "ValueError: No JSON object could be decoded" Because the spark-hbase IO is based on strings.
# And the order of items is not as expected. See ../res/row-sample.txt or check in hbase shell for that.
|
02528074
Chunk
staged.
|
70
71
|
data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in items[1:]]
|
ece71a0d
Chunk
Streaming! encodi...
|
72
73
|
return (key, data)
|
8bddd8b3
Chunk
You guess what? T...
|
74
|
|
3b4e250d
Chunk
staged.
|
75
76
77
|
def rddparse_dataset_ILS(raw_row):
if raw_row[0] == '04650c488a2b163ca8a1f52da6022f03.jpg':
print raw_row
|
02528074
Chunk
staged.
|
78
79
80
81
82
83
84
|
items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
# tag = int(items[-2].split('cf_tag:' + tagtype)[-1])
# feat = [item for sublist in json.loads(items[-1].split('cf_feat:' + feattype)[-1]) for subsublist in sublist for item in subsublist]
tag = int(items[-1].split(':')[-1])
feat = [item for sublist in json.loads(items[0].split(':')[-1]) for subsublist in sublist for item in subsublist]
return (tag, feat)
|
ece71a0d
Chunk
Streaming! encodi...
|
85
86
|
|
02528074
Chunk
staged.
|
87
88
89
90
|
def rddinfo_ILS(img, info_rate=None, tag_chosen=None, tag_class=None):
"""
Tempfile is our friend. (?)
"""
|
1c2a3fa0
Chunk
staged.
|
91
|
info_rate = info_rate if info_rate != None else 0.0
|
3b4e250d
Chunk
staged.
|
92
93
94
95
96
97
98
|
tag_chosen = tag_chosen if tag_chosen != None else stats.bernoulli.rvs(0.8)
tag_class = tag_class if tag_class != None else 0
try:
tmpf = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b', delete=True)
tmpf.write(img)
tmpf.seek(0)
im = Jpeg(tmpf.name, key=sample_key)
|
489c5608
Chunk
debugging...
|
99
|
info = [
|
3b4e250d
Chunk
staged.
|
100
101
102
|
im.image_width,
im.image_height,
im.image_width * im.image_height,
|
1c2a3fa0
Chunk
staged.
|
103
104
105
106
107
108
109
110
111
112
|
im.getCapacity(),
im.getQuality(),
info_rate,
tag_chosen,
tag_class
]
return info
except Exception as e:
print e
raise
|
3b4e250d
Chunk
staged.
|
113
114
115
|
finally:
tmpf.close()
|
1c2a3fa0
Chunk
staged.
|
116
|
|
3b4e250d
Chunk
staged.
|
117
118
119
120
|
def rddembed_ILS(row, rate=None):
"""
input:
e.g. row =('row1',[1,3400,'hello'])
|
d47ae6ce
Chunk
staged.
|
121
|
return:
|
3b4e250d
Chunk
staged.
|
122
123
124
125
126
127
128
|
newrow = ('row2',[34,5400,'embeded'])
"""
items = row[1]
capacity, chosen = int(items[4]), int(items[7])
if chosen == 0:
return None
try:
|
d47ae6ce
Chunk
staged.
|
129
|
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
|
3b4e250d
Chunk
staged.
|
130
131
132
133
134
135
136
137
|
tmpf_src.write(items[0])
tmpf_src.seek(0)
tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
steger = F5.F5(sample_key, 1)
if rate == None:
embed_rate = steger.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'),
|
1c2a3fa0
Chunk
staged.
|
138
139
|
tmpf_dst.name)
else:
|
3b4e250d
Chunk
staged.
|
140
|
assert (rate >= 0 and rate < 1)
|
ece71a0d
Chunk
Streaming! encodi...
|
141
142
|
# print capacity
hidden = np.random.bytes(int(int(capacity) * rate) / 8)
|
3b4e250d
Chunk
staged.
|
143
144
145
146
147
148
149
150
151
152
153
|
embed_rate = steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
tmpf_dst.seek(0)
raw = tmpf_dst.read()
index = md5(raw).hexdigest()
return (index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))
except Exception as e:
print e
raise
|
1c2a3fa0
Chunk
staged.
|
154
|
finally:
|
3b4e250d
Chunk
staged.
|
155
156
157
158
159
160
161
162
|
tmpf_src.close()
tmpf_dst.close()
def rddembed_ILS_EXT(row, rate=None):
"""
input:
e.g. row =('row1',[1,3400,'hello'])
return:
|
d642d837
Chunk
staged.
|
163
|
newrow = ('row2',[34,5400,'embeded']) or NULL
|
489c5608
Chunk
debugging...
|
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
[row,newrow]
"""
items = row[1]
capacity, chosen = int(items[4]), int(items[7])
if chosen == 0:
return [row]
try:
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf_src.write(items[0])
tmpf_src.seek(0)
tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
steger = F5.F5(sample_key, 1)
if rate == None:
embed_rate = steger.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'),
tmpf_dst.name)
else:
|
51708346
Chunk
final experiments...
|
182
|
assert (rate >= 0 and rate < 1)
|
489c5608
Chunk
debugging...
|
183
184
|
# print capacity
hidden = np.random.bytes(int(int(capacity) * rate) / 8)
|
ece71a0d
Chunk
Streaming! encodi...
|
185
186
|
embed_rate = steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
|
489c5608
Chunk
debugging...
|
187
188
189
190
191
192
193
194
195
196
197
|
tmpf_dst.seek(0)
raw = tmpf_dst.read()
index = md5(raw).hexdigest()
return [row,(index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))]
except Exception as e:
print e
raise
finally:
tmpf_src.close()
|
d642d837
Chunk
staged.
|
198
|
tmpf_dst.close()
|
489c5608
Chunk
debugging...
|
199
200
201
202
203
204
205
206
|
def _get_feat(image, feattype='ibd', **kwargs):
if feattype == 'ibd':
feater = IntraBlockDiff.FeatIntraBlockDiff()
else:
raise Exception("Unknown feature type!")
|
3b4e250d
Chunk
staged.
|
207
|
desc = feater.feat(image)
|
1c2a3fa0
Chunk
staged.
|
208
209
210
211
212
213
214
215
216
217
218
|
return desc
def rddfeat_ILS(items, feattype='ibd', **kwargs):
try:
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf_src.write(items[0])
tmpf_src.seek(0)
desc = json.dumps(_get_feat(tmpf_src.name, feattype=feattype).tolist())
|
8bddd8b3
Chunk
You guess what? T...
|
219
|
# print 'desccccccccccccccccccc',desc
|
1c2a3fa0
Chunk
staged.
|
220
221
222
223
224
225
|
return items + [desc]
except Exception as e:
print e
raise
finally:
|
8bddd8b3
Chunk
You guess what? T...
|
226
227
|
tmpf_src.close()
|
1c2a3fa0
Chunk
staged.
|
228
229
230
231
232
233
234
|
def format_out(row, cols, withdata=False):
"""
input:
e.g. row =('row1',[1,3400,'hello'])
cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']]
return:
|
e3ec1f74
Chunk
staged.
|
235
236
|
[('row1',['row1', 'cf_info', 'id', '1']),('row1',['row1', 'cf_info', 'size', '3400']),('row1',['row1', 'cf_tag', 'desc', 'hello'])]
"""
|
4f36b116
Chunk
staged.
|
237
238
239
|
puts = []
key = row[0]
# if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
|
e3ec1f74
Chunk
staged.
|
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
# print row
if not withdata:
for data, col in zip(row[1][1:], cols[1:]):
puts.append((key, [key] + col + [str(data)]))
else:
for data, col in zip(row[1], cols):
puts.append((key, [key] + col + [str(data)]))
return puts
class Sparker(object):
def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
load_env()
self.host = host
self.appname = appname
self.master = kwargs.get('master', 'spark://%s:7077' % self.host)
self.conf = SparkConf()
|
e3e7e73a
Chunk
spider standalone...
|
257
|
self.conf.setSparkHome(self.host) \
|
1c2a3fa0
Chunk
staged.
|
258
|
.setMaster(self.master) \
|
d47ae6ce
Chunk
staged.
|
259
|
.setAppName(self.appname)
|
0fbc087e
Chunk
staged.
|
260
261
262
263
264
|
# self.conf.set("spark.akka.frameSize","10685760")
# self.conf.set("spark.driver.extraClassPath", extraClassPath) \
# .set("spark.executor.extraClassPath", extraClassPath) \
# .set("SPARK_CLASSPATH", extraClassPath) \
|
1c2a3fa0
Chunk
staged.
|
265
|
# .set("spark.driver.memory", "1G") \
|
0fbc087e
Chunk
staged.
|
266
267
268
|
# .set("spark.yarn.jar", sparkJar)
self.sc = SparkContext(conf=self.conf)
|
d47ae6ce
Chunk
staged.
|
269
270
271
272
273
274
275
276
|
self.model = None
def read_hbase(self, table_name, func=None, collect=False):
"""
ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
|
0fbc087e
Chunk
staged.
|
277
278
|
Filter format:
columns=['cf1:col1', 'cf1:col2']
|
26616791
Chunk
RDD-hbase bug fix...
|
279
|
or
|
54e2adda
Chunk
staged.
|
280
281
|
columns=['cf1']
|
26616791
Chunk
RDD-hbase bug fix...
|
282
|
"""
|
e3ec1f74
Chunk
staged.
|
283
|
|
54e2adda
Chunk
staged.
|
284
285
286
287
|
hconf = {
"hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
#"hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
|
4f36b116
Chunk
staged.
|
288
|
}
|
54e2adda
Chunk
staged.
|
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
|
hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
keyClass=hparams["readKeyClass"],
valueClass=hparams["readValueClass"],
keyConverter=hparams["readKeyConverter"],
valueConverter=hparams["readValueConverter"],
conf=hconf)
parser = func if func != None else rddparse_data_CV
hbase_rdd = hbase_rdd.map(lambda x: parser(x))
if collect:
return hbase_rdd.collect()
else:
return hbase_rdd
def write_hbase(self, table_name, data, fromrdd=False, columns=None, withdata=False):
"""
Data Format: (Deprecated)
e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
Data(from dictionary):
e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
Data(from Rdd):
e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
"""
hconf = {
"hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
#"hbase.zookeeper.quorum": self.host,
"hbase.mapreduce.inputtable": table_name,
"hbase.mapred.outputtable": table_name,
"mapreduce.outputformat.class": hparams["outputFormatClass"],
"mapreduce.job.output.key.class": hparams["writeKeyClass"],
"mapreduce.job.output.value.class": hparams["writeValueClass"],
}
cols = [col.split(':') for col in columns]
if not fromrdd:
rdd_data = self.sc.parallelize(data)
else:
rdd_data = data
rdd_data.flatMap(lambda x: format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset(
conf=hconf,
keyConverter=hparams["writeKeyConverter"],
valueConverter=hparams["writeValueConverter"])
def train_svm(self, X, Y=None):
if Y == None:
# From rdd_labeled
assert isinstance(X, RDD)
svm = SVMWithSGD.train(X)
else:
# data = []
# for feat, tag in zip(X, Y):
# data.append(LabeledPoint(tag, feat))
# svm = SVMWithSGD.train(self.sc.parallelize(data))
hdd_data = self.sc.parallelize(zip(X, Y), 20).map(lambda x: LabeledPoint(x[1], x[0]))
svm = SVMWithSGD.train(hdd_data)
|
0fbc087e
Chunk
staged.
|
351
|
self.model = svm
|
a9c10957
Chunk
hbase-svm & spark...
|
352
353
354
355
356
|
# with open('res/svm_spark.model', 'wb') as modelfile:
# model = pickle.dump(svm, modelfile)
return self.model
|
018ebf56
Chunk
Spark Streaming T...
|
357
|
def predict_svm(self, x, collect=False, model=None):
|
a9c10957
Chunk
hbase-svm & spark...
|
358
|
"""
|
5ec38adb
Chunk
spark-local of da...
|
359
360
361
|
From pyspark.mlib.classification.py:
>> svm.predict([1.0])
|
a9c10957
Chunk
hbase-svm & spark...
|
362
|
1
|
018ebf56
Chunk
Spark Streaming T...
|
363
|
>> svm.predict(sc.parallelize([[1.0]])).collect()
|
5ec38adb
Chunk
spark-local of da...
|
364
365
366
367
368
369
370
|
[1]
>> svm.clearThreshold()
>> svm.predict(array([1.0]))
1.25...
"""
if model is None:
if self.model != None:
|
a9c10957
Chunk
hbase-svm & spark...
|
371
372
373
|
model = self.model
else:
# with open('res/svm_spark.model', 'rb') as modelfile:
|
f4fb4381
Chunk
staged.
|
374
|
# model = pickle.load(modelfile)
|
a9c10957
Chunk
hbase-svm & spark...
|
375
376
377
378
379
380
381
382
383
|
raise Exception("No model available!")
res = model.predict(x)
if collect:
return res.collect()
else:
return res
def test_svm(self, X, Y=None, model=None):
|
3b4e250d
Chunk
staged.
|
384
|
if model is None:
|
489c5608
Chunk
debugging...
|
385
|
if self.model != None:
|
54e2adda
Chunk
staged.
|
386
387
388
|
model = self.model
else:
# with open('res/svm_spark.model', 'rb') as modelfile:
|
d642d837
Chunk
staged.
|
389
|
# model = pickle.load(modelfile)
|
a9c10957
Chunk
hbase-svm & spark...
|
390
391
392
393
394
395
396
|
raise Exception("No model available!")
if Y == None:
assert isinstance(X, RDD)
pass
else:
result_Y = np.array(self.predict_svm(X, collect=True))
|
a9c10957
Chunk
hbase-svm & spark...
|
397
|
return np.mean(Y == result_Y)
|
3b4e250d
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0a55c5f4
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
489c5608
Chunk
debugging...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
3b4e250d
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
02528074
Chunk
staged.
|
|
|
f4fb4381
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
f20e20ce
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|