__author__ = 'chunk' from . import * from ..mfeat import HOG, IntraBlockDiff from ..mspark import SC from pyspark.mllib.regression import LabeledPoint from ..common import * import os, sys from hashlib import md5 import csv import json import happybase from ..mjpeg import * from ..msteg import * from ..msteg.steganography import LSB, F3, F4, F5 import numpy as np from scipy import stats import tempfile np.random.seed(sum(map(ord, "whoami"))) package_dir = os.path.dirname(os.path.abspath(__file__)) class DataILSVRC_S(DataDumperBase): """ This module is specially for ILSVRC data processing under spark & hbase. We posit that the DB(e.g. HBase) has only the images data with md5 name as id. The task is to gennerate info(size,capacity,quality,etc.) and class & chosen tags, and then to perform embedding and finally to calcculate ibd features. Each step includes reading from & writing to Hbase (though PC). And each step must have a 'spark' mode option, which means that the operation is performed by spark with reading & wrting through RDDs. copyright(c) 2015 chunkplus@gmail.com """ def __init__(self, base='ILSVRC2013_DET_val', category='Train_1'): DataDumperBase.__init__(self, base, category) self.base = base self.category = category self.dict_data = {} self.rdd_data = None self.table_name = self.base.strip('/').split('/')[-1] if self.category != None: self.table_name += ('-' + self.category) self.sparker = None self.steger = F5.F5(sample_key, 1) def get_table(self): if self.table != None: return self.table if self.connection is None: c = happybase.Connection('HPC-server') self.connection = c tables = self.connection.tables() if self.table_name not in tables: families = {'cf_pic': dict(), 'cf_info': dict(max_versions=10), 'cf_tag': dict(), 'cf_feat': dict(), } self.connection.create_table(name=self.table_name, families=families) table = self.connection.table(name=self.table_name) self.table = table return table def delete_table(self, table_name=None, disable=True): if table_name == None: table_name = self.table_name if self.connection is None: c = happybase.Connection('HPC-server') self.connection = c tables = self.connection.tables() if table_name not in tables: return False else: try: self.connection.delete_table(table_name, disable) except: print 'Exception when deleting table.' raise return True def _get_info(self, img, info_rate=None, tag_chosen=None, tag_class=None): """ Tempfile is our friend. (?) """ info_rate = info_rate if info_rate != None else 0.0 tag_chosen = tag_chosen if tag_chosen != None else stats.bernoulli.rvs(0.8) tag_class = tag_class if tag_class != None else 0 try: tmpf = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') tmpf.write(img) tmpf.seek(0) im = Jpeg(tmpf.name, key=sample_key) info = [ im.image_width, im.image_height, im.image_width * im.image_height, im.getCapacity(), im.getQuality(), info_rate, tag_chosen, tag_class ] return info except Exception as e: print e finally: tmpf.close() def _get_feat(self, image, feattype='ibd', **kwargs): size = kwargs.get('size', (48, 48)) if feattype == 'hog': feater = HOG.FeatHOG(size=size) elif feattype == 'ibd': feater = IntraBlockDiff.FeatIntraBlockDiff() else: raise Exception("Unknown feature type!") desc = feater.feat(image) return desc def _rddparse_data(raw_row): """ input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True') return: ([0.056273,...],1) In fact we can also use mapValues. """ key = raw_row[0] # if key == '04650c488a2b163ca8a1f52da6022f03.jpg': # with open('/tmp/hhhh','wb') as f: # f.write(raw_row[1].decode('unicode-escape')).encode('latin-1') items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--') data = items[0].split('cf_pic:data:')[-1] return (key, data) def _rddparse_all(raw_row): key = raw_row[0] items = raw_row[1].decode('unicode-escape').encode('latin-1').split('--%--') data = [items[0].split('cf_pic:data:')[-1]] + [json.loads(item.split(':')[-1]) for item in items[1:]] return (key, data) def _rdd_embed(self, row): """ input: e.g. row =('row1',[1,3400,'hello']) return: newrow = ('row2',[34,5400,'embeded']) """ items = row[1] capacity, rate, chosen = items[4], items[6], items[7] if chosen == 0: return None try: tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') tmpf_src.write(items[0]) tmpf_src.seek(0) tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') if rate == None: embed_rate = self.steger.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'), tmpf_dst.name) else: assert (rate >= 0 and rate < 1) # print capacity hidden = np.random.bytes(int(int(capacity) * rate) / 8) embed_rate = self.steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True) tmpf_dst.seek(0) raw = tmpf_dst.read() index = md5(raw).hexdigest() return (index + '.jpg', [raw] + self._get_info(raw, embed_rate, 0, 1)) except Exception as e: print e raise finally: tmpf_src.close() tmpf_dst.close() def _extract_data(self, mode='hbase', writeback=False, withdata=True): """ Get info barely out of image data. """ if mode == 'hbase': if self.table == None: self.table = self.get_table() cols = ['cf_pic:data'] for key, data in self.table.scan(columns=cols): data = data['cf_pic:data'] self.dict_data[key] = [data] + self._get_info(data) if not writeback: return self.dict_data else: try: with self.table.batch(batch_size=5000) as b: for imgname, imginfo in self.dict_data.items(): b.put(imgname, { # 'cf_pic:data': imginfo[0], 'cf_info:width': str(imginfo[1]), 'cf_info:height': str(imginfo[2]), 'cf_info:size': str(imginfo[3]), 'cf_info:capacity': str(imginfo[4]), 'cf_info:quality': str(imginfo[5]), 'cf_info:rate': str(imginfo[6]), 'cf_tag:chosen': str(imginfo[7]), 'cf_tag:class': str(imginfo[8]), }) except ValueError: raise elif mode == 'spark': if self.sparker == None: self.sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') cols = [ 'cf_pic:data', 'cf_info:width', 'cf_info:height', 'cf_info:size', 'cf_info:capacity', 'cf_info:quality', 'cf_info:rate', 'cf_tag:chosen', 'cf_tag:class' ] # # Debug # tmp_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_data_ILS, # collect=False) # # tmp_data = tmp_data.mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) # print tmp_data.collect()[0][1] # return self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_data_ILS, collect=False).mapValues( lambda data: [data] + SC.rddinfo_ILS(data)) if not writeback: return self.rdd_data else: self.sparker.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols, withdata=withdata) else: raise Exception("Unknown mode!") def _embed_data(self, mode='hbase', rate=None, readforward=False, writeback=False, withdata=True): if mode == 'hbase': if self.table == None: self.table = self.get_table() if readforward: self.dict_data = {} cols = [ 'cf_pic:data', 'cf_info:width', 'cf_info:height', 'cf_info:size', 'cf_info:capacity', 'cf_info:quality', 'cf_info:rate', 'cf_tag:chosen', 'cf_tag:class' ] for key, data in self.table.scan(columns=cols): data = [data[k] for k in cols] self.dict_data[key] = data dict_data_ext = {} for imgname, imgdata in self.dict_data.items(): capacity, chosen = int(imgdata[4]), int(imgdata[7]) if chosen == 0: continue try: tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') tmpf_src.write(imgdata[0]) tmpf_src.seek(0) tmpf_dst = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') if rate == None: embed_rate = self.steger.embed_raw_data(tmpf_src.name, os.path.join(package_dir, '../res/toembed'), tmpf_dst.name) else: assert (rate >= 0 and rate < 1) # print capacity hidden = np.random.bytes(int(capacity * rate) / 8) embed_rate = self.steger.embed_raw_data(tmpf_src.name, hidden, tmpf_dst.name, frommem=True) tmpf_dst.seek(0) raw = tmpf_dst.read() index = md5(raw).hexdigest() dict_data_ext[index + '.jpg'] = [raw] + self._get_info(raw, embed_rate, 0, 1) except Exception as e: print e raise finally: tmpf_src.close() tmpf_dst.close() self.dict_data.update(dict_data_ext) if not writeback: return self.dict_data else: try: with self.table.batch(batch_size=5000) as b: for imgname, imginfo in dict_data_ext.items(): b.put(imgname, { 'cf_pic:data': imginfo[0], 'cf_info:width': str(imginfo[1]), 'cf_info:height': str(imginfo[2]), 'cf_info:size': str(imginfo[3]), 'cf_info:capacity': str(imginfo[4]), 'cf_info:quality': str(imginfo[5]), 'cf_info:rate': str(imginfo[6]), 'cf_tag:chosen': str(imginfo[7]), 'cf_tag:class': str(imginfo[8]), }) except ValueError: raise elif mode == 'spark': if self.sparker == None: self.sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') cols = [ 'cf_pic:data', 'cf_info:width', 'cf_info:height', 'cf_info:size', 'cf_info:capacity', 'cf_info:quality', 'cf_info:rate', 'cf_tag:chosen', 'cf_tag:class' ] if readforward: self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False) rdd_data_ext = self.rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=rate)).filter(lambda x: x != None) self.rdd_data = self.rdd_data.union(rdd_data_ext) if not writeback: return self.rdd_data else: self.sparker.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols, withdata=withdata) else: raise Exception("Unknown mode!") def _extract_feat(self, mode='hbase', feattype='ibd', readforward=False, writeback=False, withdata=False): if mode == 'hbase': if self.table == None: self.table = self.get_table() if readforward: self.dict_data = {} cols = [ 'cf_pic:data', 'cf_info:width', 'cf_info:height', 'cf_info:size', 'cf_info:capacity', 'cf_info:quality', 'cf_info:rate', 'cf_tag:chosen', 'cf_tag:class' ] for key, data in self.table.scan(columns=cols): data = [data[k] for k in cols] self.dict_data[key] = data for imgname, imgdata in self.dict_data.items(): try: tmpf_src = tempfile.NamedTemporaryFile(suffix='.jpg', mode='w+b') tmpf_src.write(imgdata[0]) tmpf_src.seek(0) desc = json.dumps(self._get_feat(tmpf_src.name, feattype=feattype).tolist()) self.dict_data[imgname].append(desc) except Exception as e: print e raise finally: tmpf_src.close() if not writeback: return self.dict_data else: try: with self.table.batch(batch_size=5000) as b: for imgname, imginfo in self.dict_data.items(): b.put(imgname, { 'cf_pic:data': imginfo[0], 'cf_info:width': str(imginfo[1]), 'cf_info:height': str(imginfo[2]), 'cf_info:size': str(imginfo[3]), 'cf_info:capacity': str(imginfo[4]), 'cf_info:quality': str(imginfo[5]), 'cf_info:rate': str(imginfo[6]), 'cf_tag:chosen': str(imginfo[7]), 'cf_tag:class': str(imginfo[8]), 'cf_feat:' + feattype: imginfo[9], }) except ValueError: raise elif mode == 'spark': if self.sparker == None: self.sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') cols = [ 'cf_pic:data', 'cf_info:width', 'cf_info:height', 'cf_info:size', 'cf_info:capacity', 'cf_info:quality', 'cf_info:rate', 'cf_tag:chosen', 'cf_tag:class', 'cf_feat:' + feattype, ] if readforward: self.rdd_data = self.sparker.read_hbase(self.table_name, func=SC.rddparse_all_ILS, collect=False) self.rdd_data = self.rdd_data.mapValues(lambda items: SC.rddfeat_ILS(items)) # print self.rdd_data.collect()[0] # return if not writeback: return self.rdd_data else: self.sparker.write_hbase(self.table_name, self.rdd_data, fromrdd=True, columns=cols, withdata=withdata) else: raise Exception("Unknown mode!") def format(self): self._extract_data(mode='hbase', writeback=False) self._embed_data(mode='hbase', rate=0.1, readforward=False, writeback=False) self._extract_feat(mode='hbase', feattype='ibd', readforward=False, writeback=True) def load_data(self, mode='hbase', feattype='ibd', tagtype='class', collect=False): INDEX = [] X = [] Y = [] if mode == "hbase": if self.table == None: self.table = self.get_table() col_feat, col_tag = 'cf_feat:' + feattype, 'cf_tag:' + tagtype for key, data in self.table.scan(columns=[col_feat, col_tag]): X.append( [item for sublist in json.loads(data[col_feat]) for subsublist in sublist for item in subsublist]) Y.append(int(data[col_tag])) elif mode == "spark" or mode == "cluster": if self.sparker == None: self.sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077') rdd_dataset = self.sparker.read_hbase(self.table_name, func=SC.rddparse_dataset_ILS, collect=False) if not collect: rdd_dataset = rdd_dataset.map(lambda x: LabeledPoint(x[0], x[1])) return rdd_dataset for tag, feat in rdd_dataset.collect(): X.append(feat) Y.append(tag) else: raise Exception("Unknown mode!") return X, Y