3b4e250d
Chunk
staged.
|
1
|
# -*- coding: utf-8 -*-
|
a9c10957
Chunk
hbase-svm & spark...
|
2
3
|
__author__ = 'chunk'
|
ca73c96f
Chunk
Transformed into ...
|
4
|
from ..common import *
|
f69baeb6
Chunk
spark streaming ...
|
5
6
|
from .dependencies import *
from . import *
|
3b4e250d
Chunk
staged.
|
7
8
9
10
11
|
# from ..mdata import MSR, CV, ILSVRC, ILSVRC_S
from ..mjpeg import *
from ..msteg import *
from ..msteg.steganography import LSB, F3, F4, F5
|
1c2a3fa0
Chunk
staged.
|
12
|
from ..mfeat import IntraBlockDiff
|
e3e7e73a
Chunk
spider standalone...
|
13
|
|
ca73c96f
Chunk
Transformed into ...
|
14
|
import sys
|
a9c10957
Chunk
hbase-svm & spark...
|
15
|
from pyspark import SparkConf, SparkContext
|
02528074
Chunk
staged.
|
16
|
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
|
a9c10957
Chunk
hbase-svm & spark...
|
17
18
19
20
|
from pyspark.mllib.regression import LabeledPoint
from numpy import array
import json
import pickle
|
5ec38adb
Chunk
spark-local of da...
|
21
22
|
import tempfile
|
3b4e250d
Chunk
staged.
|
23
24
25
|
import numpy as np
from scipy import stats
from hashlib import md5
|
3b4e250d
Chunk
staged.
|
26
27
28
|
np.random.seed(sum(map(ord, "whoami")))
package_dir = os.path.dirname(os.path.abspath(__file__))
|
1c2a3fa0
Chunk
staged.
|
29
|
|
3b4e250d
Chunk
staged.
|
30
|
|
e3e7e73a
Chunk
spider standalone...
|
31
|
def rddparse_data_CV(raw_row):
|
3b4e250d
Chunk
staged.
|
32
|
"""
|
e3ec1f74
Chunk
staged.
|
33
|
input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
|
3b4e250d
Chunk
staged.
|
34
|
return: ([0.056273,...],1)
|
5ec38adb
Chunk
spark-local of da...
|
35
36
37
38
39
40
41
42
43
44
|
"""
data = raw_row[1].split('--%--')
feat = json.loads(data[0].split(':')[-1])
tag = 1 if data[-1].split(':')[-1] == 'True' else 0
return (feat, tag)
def rddparse_data_ILS(raw_row):
"""
input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
|
3b4e250d
Chunk
staged.
|
45
46
47
48
49
50
51
|
return: ([0.056273,...],1)
In fact we can also use mapValues.
"""
key = raw_row[0]
# if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
# with open('/tmp/hhhh','wb') as f:
|
3b4e250d
Chunk
staged.
|
52
|
# f.write(raw_row[1].decode('unicode-escape')).encode('latin-1')
|
1c2a3fa0
Chunk
staged.
|
53
54
55
56
|
items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
data = items[0].split('cf_pic:data:')[-1]
return (key, data)
|
3b4e250d
Chunk
staged.
|
57
58
59
60
61
|
def rddparse_all_ILS(raw_row):
"""
Deprecated
"""
|
8bddd8b3
Chunk
You guess what? T...
|
62
63
64
|
key = raw_row[0]
items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
|
3b4e250d
Chunk
staged.
|
65
|
# @TODO
|
1c2a3fa0
Chunk
staged.
|
66
|
# N.B "ValueError: No JSON object could be decoded" Because the spark-hbase IO is based on strings.
|
8bddd8b3
Chunk
You guess what? T...
|
67
68
69
|
data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in items[1:]]
return (key, data)
|
02528074
Chunk
staged.
|
70
71
|
|
ece71a0d
Chunk
Streaming! encodi...
|
72
73
|
def rddinfo_ILS(img, info_rate=None, tag_chosen=None, tag_class=None):
"""
|
8bddd8b3
Chunk
You guess what? T...
|
74
|
Tempfile is our friend. (?)
|
3b4e250d
Chunk
staged.
|
75
76
77
|
"""
info_rate = info_rate if info_rate != None else 0.0
tag_chosen = tag_chosen if tag_chosen != None else stats.bernoulli.rvs(0.8)
|
02528074
Chunk
staged.
|
78
79
80
81
82
83
84
|
tag_class = tag_class if tag_class != None else 0
try:
tmpf = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf.write(img)
tmpf.seek(0)
im = Jpeg(tmpf.name, key=sample_key)
info = [
|
ece71a0d
Chunk
Streaming! encodi...
|
85
86
|
im.image_width,
im.image_height,
|
02528074
Chunk
staged.
|
87
88
89
90
|
im.image_width * im.image_height,
im.getCapacity(),
im.getQuality(),
info_rate,
|
1c2a3fa0
Chunk
staged.
|
91
|
tag_chosen,
|
3b4e250d
Chunk
staged.
|
92
93
94
95
96
97
98
|
tag_class
]
return info
except Exception as e:
print e
raise
finally:
|
489c5608
Chunk
debugging...
|
99
|
tmpf.close()
|
3b4e250d
Chunk
staged.
|
100
101
102
|
def rddembed_ILS(row, rate=None):
|
1c2a3fa0
Chunk
staged.
|
103
104
105
106
107
108
109
110
111
112
|
"""
input:
e.g. row =('row1',[1,3400,'hello'])
return:
newrow = ('row2',[34,5400,'embeded'])
"""
items = row[1]
capacity, chosen = int(items[4]), int(items[7])
if chosen == 0:
return None
|
3b4e250d
Chunk
staged.
|
113
114
115
|
try:
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf_src.write(items[0])
|
1c2a3fa0
Chunk
staged.
|
116
|
tmpf_src.seek(0)
|
3b4e250d
Chunk
staged.
|
117
118
119
120
|
tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
steger = F5.F5(sample_key, 1)
|
d47ae6ce
Chunk
staged.
|
121
|
if rate == None:
|
3b4e250d
Chunk
staged.
|
122
123
124
125
126
127
128
|
embed_rate = steger.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'),
tmpf_dst.name)
else:
assert (rate >= 0 and rate < 1)
# print capacity
hidden = np.random.bytes(int(int(capacity) * rate) / 8)
embed_rate = steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
|
d47ae6ce
Chunk
staged.
|
129
|
|
3b4e250d
Chunk
staged.
|
130
131
132
133
134
135
136
137
|
tmpf_dst.seek(0)
raw = tmpf_dst.read()
index = md5(raw).hexdigest()
return (index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))
except Exception as e:
print e
|
1c2a3fa0
Chunk
staged.
|
138
139
|
raise
finally:
|
3b4e250d
Chunk
staged.
|
140
|
tmpf_src.close()
|
ece71a0d
Chunk
Streaming! encodi...
|
141
142
|
tmpf_dst.close()
|
3b4e250d
Chunk
staged.
|
143
144
145
146
147
148
149
150
151
152
153
|
def _get_feat(image, feattype='ibd', **kwargs):
if feattype == 'ibd':
feater = IntraBlockDiff.FeatIntraBlockDiff()
else:
raise Exception("Unknown feature type!")
desc = feater.feat(image)
return desc
|
1c2a3fa0
Chunk
staged.
|
154
|
|
3b4e250d
Chunk
staged.
|
155
156
157
158
159
160
161
162
|
def rddfeat_ILS(items, feattype='ibd', **kwargs):
try:
tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
tmpf_src.write(items[0])
tmpf_src.seek(0)
desc = json.dumps(_get_feat(tmpf_src.name, feattype=feattype).tolist())
# print 'desccccccccccccccccccc',desc
|
d642d837
Chunk
staged.
|
163
|
return items + [desc]
|
489c5608
Chunk
debugging...
|
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
except Exception as e:
print e
raise
finally:
tmpf_src.close()
def format_out(row, cols, withdata=False):
"""
input:
e.g. row =('row1',[1,3400,'hello'])
cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']]
return:
[('row1',['row1', 'cf_info', 'id', '1']),('row1',['row1', 'cf_info', 'size', '3400']),('row1',['row1', 'cf_tag', 'desc', 'hello'])]
"""
puts = []
key = row[0]
|
51708346
Chunk
final experiments...
|
182
|
# if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
|
489c5608
Chunk
debugging...
|
183
184
|
# print row
if not withdata:
|
ece71a0d
Chunk
Streaming! encodi...
|
185
186
|
for data, col in zip(row[1][1:], cols[1:]):
puts.append((key, [key] + col + [str(data)]))
|
489c5608
Chunk
debugging...
|
187
188
189
190
191
192
193
194
195
196
197
|
else:
for data, col in zip(row[1], cols):
puts.append((key, [key] + col + [str(data)]))
return puts
class Sparker(object):
def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
load_env()
self.host = host
self.appname = appname
|
d642d837
Chunk
staged.
|
198
|
self.master = kwargs.get('master', 'spark://%s:7077' % self.host)
|
489c5608
Chunk
debugging...
|
199
200
201
202
203
204
205
206
|
self.conf = SparkConf()
self.conf.setSparkHome(self.host) \
.setMaster(self.master) \
.setAppName(self.appname)
# self.conf.set("spark.akka.frameSize","10685760")
# self.conf.set("spark.driver.extraClassPath", extraClassPath) \
# .set("spark.executor.extraClassPath", extraClassPath) \
|
3b4e250d
Chunk
staged.
|
207
|
# .set("SPARK_CLASSPATH", extraClassPath) \
|
1c2a3fa0
Chunk
staged.
|
208
209
210
211
212
213
214
215
216
217
218
|
# .set("spark.driver.memory", "1G") \
# .set("spark.yarn.jar", sparkJar)
self.sc = SparkContext(conf=self.conf)
self.model = None
def read_hbase(self, table_name, func=None, collect=False):
"""
ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
|
8bddd8b3
Chunk
You guess what? T...
|
219
|
|
1c2a3fa0
Chunk
staged.
|
220
221
222
223
224
225
|
Filter format:
columns=['cf1:col1', 'cf1:col2']
or
columns=['cf1']
"""
|
8bddd8b3
Chunk
You guess what? T...
|
226
227
|
hconf = {"hbase.zookeeper.quorum": self.host,
|
1c2a3fa0
Chunk
staged.
|
228
229
230
231
232
233
234
|
"hbase.mapreduce.inputtable": table_name,
}
hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
keyClass=hparams["readKeyClass"],
valueClass=hparams["readValueClass"],
keyConverter=hparams["readKeyConverter"],
|
e3ec1f74
Chunk
staged.
|
235
236
|
valueConverter=hparams["readValueConverter"],
conf=hconf)
|
4f36b116
Chunk
staged.
|
237
238
239
|
parser = func if func != None else rddparse_data_CV
hbase_rdd = hbase_rdd.map(lambda x: parser(x))
|
e3ec1f74
Chunk
staged.
|
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
if collect:
return hbase_rdd.collect()
else:
return hbase_rdd
def write_hbase(self, table_name, data, fromrdd=False, columns=None, withdata=False):
"""
Data Format: (Deprecated)
e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
Data(from dictionary):
e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
Data(from Rdd):
e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
|
e3e7e73a
Chunk
spider standalone...
|
257
|
"""
|
1c2a3fa0
Chunk
staged.
|
258
|
hconf = {"hbase.zookeeper.quorum": self.host,
|
d47ae6ce
Chunk
staged.
|
259
|
"hbase.mapreduce.inputtable": table_name,
|
0fbc087e
Chunk
staged.
|
260
261
262
263
264
|
"hbase.mapred.outputtable": table_name,
"mapreduce.outputformat.class": hparams["outputFormatClass"],
"mapreduce.job.output.key.class": hparams["writeKeyClass"],
"mapreduce.job.output.value.class": hparams["writeValueClass"],
}
|
1c2a3fa0
Chunk
staged.
|
265
|
cols = [col.split(':') for col in columns]
|
0fbc087e
Chunk
staged.
|
266
267
268
|
if not fromrdd:
rdd_data = self.sc.parallelize(data)
else:
|
d47ae6ce
Chunk
staged.
|
269
270
271
272
273
274
275
276
|
rdd_data = data
rdd_data.flatMap(lambda x: format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset(
conf=hconf,
keyConverter=hparams["writeKeyConverter"],
valueConverter=hparams["writeValueConverter"])
|
0fbc087e
Chunk
staged.
|
277
278
|
def train_svm(self, rdd_labeled):
svm = SVMWithSGD.train(rdd_labeled)
|
26616791
Chunk
RDD-hbase bug fix...
|
279
|
self.model = svm
|
54e2adda
Chunk
staged.
|
280
281
|
return svm
|
26616791
Chunk
RDD-hbase bug fix...
|
282
|
|
e3ec1f74
Chunk
staged.
|
283
|
def train_svm(self, X, Y):
|
54e2adda
Chunk
staged.
|
284
285
286
287
|
# data = []
# for feat, tag in zip(X, Y):
# data.append(LabeledPoint(tag, feat))
|
4f36b116
Chunk
staged.
|
288
|
# svm = SVMWithSGD.train(self.sc.parallelize(data))
|
54e2adda
Chunk
staged.
|
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
|
hdd_data = self.sc.parallelize(zip(X, Y), 20).map(lambda x: LabeledPoint(x[1], x[0]))
svm = SVMWithSGD.train(hdd_data)
self.model = svm
# with open('res/svm_spark.model', 'wb') as modelfile:
# model = pickle.dump(svm, modelfile)
return svm
def predict_svm(self, x, model=None):
if model is None:
if self.model != None:
model = self.model
else:
# with open('res/svm_spark.model', 'rb') as modelfile:
# model = pickle.load(modelfile)
raise Exception("No model available!")
return model.predict(x)
def test_svm(self, X, Y, model=None):
pass
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
018ebf56
Chunk
Spark Streaming T...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
018ebf56
Chunk
Spark Streaming T...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
f4fb4381
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
489c5608
Chunk
debugging...
|
|
|
54e2adda
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
3b4e250d
Chunk
staged.
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
0a55c5f4
Chunk
staged.
|
|
|
26616791
Chunk
RDD-hbase bug fix...
|
|
|
ea1eb31a
Chunk
spark is privileg...
|
|
|
d47ae6ce
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
0fbc087e
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
489c5608
Chunk
debugging...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
e3ec1f74
Chunk
staged.
|
|
|
54e2adda
Chunk
staged.
|
|
|
d642d837
Chunk
staged.
|
|
|
0fbc087e
Chunk
staged.
|
|
|
3b4e250d
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
ece71a0d
Chunk
Streaming! encodi...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
02528074
Chunk
staged.
|
|
|
f4fb4381
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
5ec38adb
Chunk
spark-local of da...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
10b4f63f
Chunk
staged. Before Pa...
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|
f20e20ce
Chunk
staged.
|
|
|
02528074
Chunk
staged.
|
|
|
a9c10957
Chunk
hbase-svm & spark...
|
|
|
02528074
Chunk
staged.
|
|
|