Blame view

mspark/SC.py 9.72 KB
3b4e250d   Chunk   staged.
1
# -*- coding: utf-8 -*-
a9c10957   Chunk   hbase-svm & spark...
2
3
__author__ = 'chunk'

ca73c96f   Chunk   Transformed into ...
4
from ..common import *
f69baeb6   Chunk   spark streaming ...
5
6
from .dependencies import *
from . import *
3b4e250d   Chunk   staged.
7
8
9
10
11
# from ..mdata import MSR, CV, ILSVRC, ILSVRC_S

from ..mjpeg import *
from ..msteg import *
from ..msteg.steganography import LSB, F3, F4, F5
1c2a3fa0   Chunk   staged.
12
from ..mfeat import IntraBlockDiff
e3e7e73a   Chunk   spider standalone...
13

ca73c96f   Chunk   Transformed into ...
14
import sys
a9c10957   Chunk   hbase-svm & spark...
15
from pyspark import SparkConf, SparkContext
02528074   Chunk   staged.
16
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
a9c10957   Chunk   hbase-svm & spark...
17
18
19
20
from pyspark.mllib.regression import LabeledPoint
from numpy import array
import json
import pickle
5ec38adb   Chunk   spark-local of da...
21
22
import tempfile

3b4e250d   Chunk   staged.
23
24
25
import numpy as np
from scipy import stats
from hashlib import md5
3b4e250d   Chunk   staged.
26
27
28

np.random.seed(sum(map(ord, "whoami")))
package_dir = os.path.dirname(os.path.abspath(__file__))
1c2a3fa0   Chunk   staged.
29

3b4e250d   Chunk   staged.
30

e3e7e73a   Chunk   spider standalone...
31
def rddparse_data_CV(raw_row):
3b4e250d   Chunk   staged.
32
    """
e3ec1f74   Chunk   staged.
33
    input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
3b4e250d   Chunk   staged.
34
    return: ([0.056273,...],1)
5ec38adb   Chunk   spark-local of da...
35
36
37
38
39
40
41
42
43
44
    """
    data = raw_row[1].split('--%--')
    feat = json.loads(data[0].split(':')[-1])
    tag = 1 if data[-1].split(':')[-1] == 'True' else 0
    return (feat, tag)


def rddparse_data_ILS(raw_row):
    """
    input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
3b4e250d   Chunk   staged.
45
46
47
48
49
50
51
    return: ([0.056273,...],1)

    In fact we can also use mapValues.
    """
    key = raw_row[0]
    # if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
    # with open('/tmp/hhhh','wb') as f:
3b4e250d   Chunk   staged.
52
    # f.write(raw_row[1].decode('unicode-escape')).encode('latin-1')
1c2a3fa0   Chunk   staged.
53
54
55
56
    items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')
    data = items[0].split('cf_pic:data:')[-1]
    return (key, data)

3b4e250d   Chunk   staged.
57
58
59
60
61

def rddparse_all_ILS(raw_row):
    """
    Deprecated
    """
8bddd8b3   Chunk   You guess what? T...
62
63
64
    key = raw_row[0]
    items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--')

3b4e250d   Chunk   staged.
65
    # @TODO
1c2a3fa0   Chunk   staged.
66
    # N.B "ValueError: No JSON object could be decoded" Because the spark-hbase IO is based on strings.
8bddd8b3   Chunk   You guess what? T...
67
68
69
    data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in items[1:]]

    return (key, data)
02528074   Chunk   staged.
70
71


ece71a0d   Chunk   Streaming! encodi...
72
73
def rddinfo_ILS(img, info_rate=None, tag_chosen=None, tag_class=None):
    """
8bddd8b3   Chunk   You guess what? T...
74
    Tempfile is our friend. (?)
3b4e250d   Chunk   staged.
75
76
77
    """
    info_rate = info_rate if info_rate != None else 0.0
    tag_chosen = tag_chosen if tag_chosen != None else stats.bernoulli.rvs(0.8)
02528074   Chunk   staged.
78
79
80
81
82
83
84
    tag_class = tag_class if tag_class != None else 0
    try:
        tmpf = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
        tmpf.write(img)
        tmpf.seek(0)
        im = Jpeg(tmpf.name, key=sample_key)
        info = [
ece71a0d   Chunk   Streaming! encodi...
85
86
            im.image_width,
            im.image_height,
02528074   Chunk   staged.
87
88
89
90
            im.image_width * im.image_height,
            im.getCapacity(),
            im.getQuality(),
            info_rate,
1c2a3fa0   Chunk   staged.
91
            tag_chosen,
3b4e250d   Chunk   staged.
92
93
94
95
96
97
98
            tag_class
        ]
        return info
    except Exception as e:
        print e
        raise
    finally:
489c5608   Chunk   debugging...
99
        tmpf.close()
3b4e250d   Chunk   staged.
100
101
102


def rddembed_ILS(row, rate=None):
1c2a3fa0   Chunk   staged.
103
104
105
106
107
108
109
110
111
112
    """
    input:
        e.g. row =('row1',[1,3400,'hello'])
    return:
        newrow = ('row2',[34,5400,'embeded'])
    """
    items = row[1]
    capacity, chosen = int(items[4]), int(items[7])
    if chosen == 0:
        return None
3b4e250d   Chunk   staged.
113
114
115
    try:
        tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
        tmpf_src.write(items[0])
1c2a3fa0   Chunk   staged.
116
        tmpf_src.seek(0)
3b4e250d   Chunk   staged.
117
118
119
120
        tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')

        steger = F5.F5(sample_key, 1)

d47ae6ce   Chunk   staged.
121
        if rate == None:
3b4e250d   Chunk   staged.
122
123
124
125
126
127
128
            embed_rate = steger.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'),
                                               tmpf_dst.name)
        else:
            assert (rate >= 0 and rate < 1)
            # print capacity
            hidden = np.random.bytes(int(int(capacity) * rate) / 8)
            embed_rate = steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True)
d47ae6ce   Chunk   staged.
129

3b4e250d   Chunk   staged.
130
131
132
133
134
135
136
137
        tmpf_dst.seek(0)
        raw = tmpf_dst.read()
        index = md5(raw).hexdigest()

        return (index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))

    except Exception as e:
        print e
1c2a3fa0   Chunk   staged.
138
139
        raise
    finally:
3b4e250d   Chunk   staged.
140
        tmpf_src.close()
ece71a0d   Chunk   Streaming! encodi...
141
142
        tmpf_dst.close()

3b4e250d   Chunk   staged.
143
144
145
146
147
148
149
150
151
152
153

def _get_feat(image, feattype='ibd', **kwargs):
    if feattype == 'ibd':
        feater = IntraBlockDiff.FeatIntraBlockDiff()
    else:
        raise Exception("Unknown feature type!")

    desc = feater.feat(image)

    return desc

1c2a3fa0   Chunk   staged.
154

3b4e250d   Chunk   staged.
155
156
157
158
159
160
161
162
def rddfeat_ILS(items, feattype='ibd', **kwargs):
    try:
        tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b')
        tmpf_src.write(items[0])
        tmpf_src.seek(0)

        desc = json.dumps(_get_feat(tmpf_src.name, feattype=feattype).tolist())
        # print 'desccccccccccccccccccc',desc
d642d837   Chunk   staged.
163
        return items + [desc]
489c5608   Chunk   debugging...
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181

    except Exception as e:
        print e
        raise
    finally:
        tmpf_src.close()


def format_out(row, cols, withdata=False):
    """
    input:
        e.g. row =('row1',[1,3400,'hello'])
            cols = [['cf_info', 'id'], ['cf_info', 'size'], ['cf_tag', 'desc']]
    return:
        [('row1',['row1', 'cf_info', 'id', '1']),('row1',['row1', 'cf_info', 'size', '3400']),('row1',['row1', 'cf_tag', 'desc', 'hello'])]
    """
    puts = []
    key = row[0]
51708346   Chunk   final experiments...
182
    # if key == '04650c488a2b163ca8a1f52da6022f03.jpg':
489c5608   Chunk   debugging...
183
184
    # print row
    if not withdata:
ece71a0d   Chunk   Streaming! encodi...
185
186
        for data, col in zip(row[1][1:], cols[1:]):
            puts.append((key, [key] + col + [str(data)]))
489c5608   Chunk   debugging...
187
188
189
190
191
192
193
194
195
196
197
    else:
        for data, col in zip(row[1], cols):
            puts.append((key, [key] + col + [str(data)]))
    return puts


class Sparker(object):
    def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
        load_env()
        self.host = host
        self.appname = appname
d642d837   Chunk   staged.
198
        self.master = kwargs.get('master', 'spark://%s:7077' % self.host)
489c5608   Chunk   debugging...
199
200
201
202
203
204
205
206
        self.conf = SparkConf()
        self.conf.setSparkHome(self.host) \
            .setMaster(self.master) \
            .setAppName(self.appname)

        # self.conf.set("spark.akka.frameSize","10685760")
        # self.conf.set("spark.driver.extraClassPath", extraClassPath) \
        # .set("spark.executor.extraClassPath", extraClassPath) \
3b4e250d   Chunk   staged.
207
        # .set("SPARK_CLASSPATH", extraClassPath) \
1c2a3fa0   Chunk   staged.
208
209
210
211
212
213
214
215
216
217
218
        # .set("spark.driver.memory", "1G") \
        # .set("spark.yarn.jar", sparkJar)

        self.sc = SparkContext(conf=self.conf)

        self.model = None


    def read_hbase(self, table_name, func=None, collect=False):
        """
        ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
8bddd8b3   Chunk   You guess what? T...
219

1c2a3fa0   Chunk   staged.
220
221
222
223
224
225
        Filter format:
            columns=['cf1:col1', 'cf1:col2']
            or
            columns=['cf1']

        """
8bddd8b3   Chunk   You guess what? T...
226
227

        hconf = {"hbase.zookeeper.quorum": self.host,
1c2a3fa0   Chunk   staged.
228
229
230
231
232
233
234
                 "hbase.mapreduce.inputtable": table_name,
                 }

        hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
                                            keyClass=hparams["readKeyClass"],
                                            valueClass=hparams["readValueClass"],
                                            keyConverter=hparams["readKeyConverter"],
e3ec1f74   Chunk   staged.
235
236
                                            valueConverter=hparams["readValueConverter"],
                                            conf=hconf)
4f36b116   Chunk   staged.
237
238
239

        parser = func if func != None else rddparse_data_CV
        hbase_rdd = hbase_rdd.map(lambda x: parser(x))
e3ec1f74   Chunk   staged.
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256

        if collect:
            return hbase_rdd.collect()
        else:
            return hbase_rdd

    def write_hbase(self, table_name, data, fromrdd=False, columns=None, withdata=False):
        """
        Data Format: (Deprecated)
            e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]

        Data(from dictionary):
            e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
                cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
        Data(from Rdd):
            e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
                cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
e3e7e73a   Chunk   spider standalone...
257
        """
1c2a3fa0   Chunk   staged.
258
        hconf = {"hbase.zookeeper.quorum": self.host,
d47ae6ce   Chunk   staged.
259
                 "hbase.mapreduce.inputtable": table_name,
0fbc087e   Chunk   staged.
260
261
262
263
264
                 "hbase.mapred.outputtable": table_name,
                 "mapreduce.outputformat.class": hparams["outputFormatClass"],
                 "mapreduce.job.output.key.class": hparams["writeKeyClass"],
                 "mapreduce.job.output.value.class": hparams["writeValueClass"],
                 }
1c2a3fa0   Chunk   staged.
265
        cols = [col.split(':') for col in columns]
0fbc087e   Chunk   staged.
266
267
268
        if not fromrdd:
            rdd_data = self.sc.parallelize(data)
        else:
d47ae6ce   Chunk   staged.
269
270
271
272
273
274
275
276
            rdd_data = data

        rdd_data.flatMap(lambda x: format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset(
            conf=hconf,
            keyConverter=hparams["writeKeyConverter"],
            valueConverter=hparams["writeValueConverter"])


0fbc087e   Chunk   staged.
277
278
    def train_svm(self, rdd_labeled):
        svm = SVMWithSGD.train(rdd_labeled)
26616791   Chunk   RDD-hbase bug fix...
279
        self.model = svm
54e2adda   Chunk   staged.
280
281

        return svm
26616791   Chunk   RDD-hbase bug fix...
282

e3ec1f74   Chunk   staged.
283
    def train_svm(self, X, Y):
54e2adda   Chunk   staged.
284
285
286
287

        # data = []
        # for feat, tag in zip(X, Y):
        # data.append(LabeledPoint(tag, feat))
4f36b116   Chunk   staged.
288
        # svm = SVMWithSGD.train(self.sc.parallelize(data))
54e2adda   Chunk   staged.
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312

        hdd_data = self.sc.parallelize(zip(X, Y), 20).map(lambda x: LabeledPoint(x[1], x[0]))
        svm = SVMWithSGD.train(hdd_data)

        self.model = svm

        # with open('res/svm_spark.model', 'wb') as modelfile:
        # model = pickle.dump(svm, modelfile)

        return svm

    def predict_svm(self, x, model=None):
        if model is None:
            if self.model != None:
                model = self.model
            else:
                # with open('res/svm_spark.model', 'rb') as modelfile:
                # model = pickle.load(modelfile)
                raise Exception("No model available!")

        return model.predict(x)

    def test_svm(self, X, Y, model=None):
        pass
0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

a9c10957   Chunk   hbase-svm & spark...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

018ebf56   Chunk   Spark Streaming T...

5ec38adb   Chunk   spark-local of da...

a9c10957   Chunk   hbase-svm & spark...

f4fb4381   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

489c5608   Chunk   debugging...

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

a9c10957   Chunk   hbase-svm & spark...

3b4e250d   Chunk   staged.

ea1eb31a   Chunk   spark is privileg...

0a55c5f4   Chunk   staged.

26616791   Chunk   RDD-hbase bug fix...

ea1eb31a   Chunk   spark is privileg...

d47ae6ce   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

0fbc087e   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

489c5608   Chunk   debugging...

ece71a0d   Chunk   Streaming! encodi...

e3ec1f74   Chunk   staged.

54e2adda   Chunk   staged.

d642d837   Chunk   staged.

0fbc087e   Chunk   staged.

3b4e250d   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

ece71a0d   Chunk   Streaming! encodi...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

10b4f63f   Chunk   staged. Before Pa...

02528074   Chunk   staged.

f4fb4381   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

5ec38adb   Chunk   spark-local of da...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

10b4f63f   Chunk   staged. Before Pa...

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.

f20e20ce   Chunk   staged.

02528074   Chunk   staged.

a9c10957   Chunk   hbase-svm & spark...

02528074   Chunk   staged.