From ea1eb31a0f395ca7810bb29b79184052b23dbdf8 Mon Sep 17 00:00:00 2001 From: Chunk Date: Wed, 8 Apr 2015 20:19:44 +0800 Subject: [PATCH] spark is privileged... we are going to write a special data module to process spark&hbase data. --- mdata/CV.py | 4 ++-- mdata/ILSVRC-S.py | 442 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ mdata/ILSVRC.py | 2 +- mdata/MSR.py | 2 +- mspark/SC.py | 19 ++++++++++++------- 5 files changed, 458 insertions(+), 11 deletions(-) create mode 100644 mdata/ILSVRC-S.py diff --git a/mdata/CV.py b/mdata/CV.py index 62b9d0a..bd584e0 100644 --- a/mdata/CV.py +++ b/mdata/CV.py @@ -89,7 +89,7 @@ class DataCV(DataDumperBase): 'cf_info': dict(max_versions=10), 'cf_tag': dict(), 'cf_feat': dict(), - } + } self.connection.create_table(name=self.table_name, families=families) table = self.connection.table(name=self.table_name) @@ -250,7 +250,7 @@ class DataCV(DataDumperBase): if self.sparkcontex == None: self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077') - result = self.sparkcontex.read_habase(self.table_name) # result = {key:[feat,tag],...} + result = self.sparkcontex.read_hbase(self.table_name, collect=True) # result = {key:[feat,tag],...} for feat, tag in result: X.append(feat) Y.append(tag) diff --git a/mdata/ILSVRC-S.py b/mdata/ILSVRC-S.py new file mode 100644 index 0000000..147b06e --- /dev/null +++ b/mdata/ILSVRC-S.py @@ -0,0 +1,442 @@ +__author__ = 'chunk' + +from . import * +from ..mfeat import HOG, IntraBlockDiff +from ..mspark import SC +from ..common import * + +import os, sys +from PIL import Image +from hashlib import md5 +import csv +import shutil +import json +import collections +import happybase + +from ..mjpeg import * +from ..msteg import * +from ..msteg.steganography import LSB, F3, F4, F5 + +import numpy as np +from numpy.random import randn +import pandas as pd +from scipy import stats + +from subprocess import Popen, PIPE, STDOUT + + +np.random.seed(sum(map(ord, "whoami"))) + +package_dir = os.path.dirname(os.path.abspath(__file__)) + + +class DataILSVRCS(DataDumperBase): + def __init__(self, base_dir='/media/chunk/Elements/D/data/ImageNet/img/ILSVRC2013_DET_val', category='Train'): + DataDumperBase.__init__(self, base_dir, category) + + self.base_dir = base_dir + self.category = category + self.data_dir = os.path.join(self.base_dir, self.category) + + self.dst_dir = os.path.join(self.base_dir, 'dst', self.category) + self.list_file = os.path.join(self.dst_dir, 'file-tag.tsv') + self.feat_dir = os.path.join(self.dst_dir, 'Feat') + self.img_dir = os.path.join(self.dst_dir, 'Img') + + self.dict_data = {} + + self.table_name = self.base_dir.strip('/').split('/')[-1] + '-' + self.category + self.sparkcontex = None + + def format(self): + self.extract() + + def _hash_copy(self, image): + if not image.endswith('jpg'): + img = Image.open(image) + img.save('../res/tmp.jpg', format='JPEG') + image = '../res/tmp.jpg' + + with open(image, 'rb') as f: + index = md5(f.read()).hexdigest() + + im = Jpeg(image, key=sample_key) + self.dict_data[index] = [im.image_width, im.image_height, im.image_width * im.image_height, im.getCapacity(), + im.getQuality()] + + # self.dict_data[index] = [im.image_width, im.image_height, os.path.getsize(image), im.getQuality()] + + # origion: + # dir = base_dir + 'Img/Train/' + index[:3] + dir = os.path.join(self.img_dir, index[:3]) + if not os.path.exists(dir): + os.makedirs(dir) + image_path = os.path.join(dir, index[3:] + '.jpg') + # print image_path + + if not os.path.exists(image_path): + shutil.copy(image, image_path) + else: + pass + + def get_feat(self, image, feattype='ibd', **kwargs): + size = kwargs.get('size', (48, 48)) + + if feattype == 'hog': + feater = HOG.FeatHOG(size=size) + elif feattype == 'ibd': + feater = IntraBlockDiff.FeatIntraBlockDiff() + else: + raise Exception("Unknown feature type!") + + desc = feater.feat(image) + + return desc + + + def extract_feat(self, feattype='ibd'): + if feattype == 'hog': + feater = HOG.FeatHOG(size=(48, 48)) + elif feattype == 'ibd': + feater = IntraBlockDiff.FeatIntraBlockDiff() + else: + raise Exception("Unknown feature type!") + + list_image = [] + with open(self.list_file, 'rb') as tsvfile: + tsvfile = csv.reader(tsvfile, delimiter='\t') + for line in tsvfile: + list_image.append(line[0]) + + dict_featbuf = {} + for imgname in list_image: + # if imgtag == 'True': + image = os.path.join(self.img_dir, imgname[:3], imgname[3:] + '.jpg') + desc = feater.feat(image) + dict_featbuf[imgname] = desc + + for imgname, desc in dict_featbuf.items(): + # print imgname, desc + dir = os.path.join(self.feat_dir, imgname[:3]) + if not os.path.exists(dir): + os.makedirs(dir) + featpath = os.path.join(dir, imgname[3:].split('.')[0] + '.' + feattype) + with open(featpath, 'wb') as featfile: + featfile.write(json.dumps(desc.tolist())) + + def _build_list(self, list_file=None): + if list_file == None: + list_file = self.list_file + assert list_file != None + + ordict_img = collections.OrderedDict(sorted(self.dict_data.items(), key=lambda d: d[0])) + + with open(list_file, 'w') as f: + tsvfile = csv.writer(f, delimiter='\t') + for key, value in ordict_img.items(): + tsvfile.writerow([key] + value) + + def _anaylis(self, list_file=None): + if list_file == None: + list_file = self.list_file + assert list_file != None + + df_ILS = pd.read_csv(list_file, names=['hash', 'width', 'height', 'size', 'capacity', 'quality'], sep='\t') + length = df_ILS.shape[0] + df_ILS = df_ILS.sort(['capacity', 'size', 'quality'], ascending=True) + rand_class = stats.bernoulli.rvs(0.8, size=length) + + df_ILS['rate'] = np.zeros(df_ILS.shape[0], np.float64) + df_ILS['chosen'] = rand_class + df_ILS['class'] = np.zeros(length, np.int32) + + df_ILS.to_csv(list_file, header=False, index=False, sep='\t') + + def extract(self): + for path, subdirs, files in os.walk(self.data_dir): + for name in files: + imagepath = os.path.join(path, name) + # print imagepath + try: + self._hash_copy(imagepath) + except: + pass + + self._build_list() + self._anaylis() + + + def _embed_outer(self): + self.dict_data = {} + dict_embedresult = {} + os.environ["CLASSPATH"] = os.path.join(package_dir, "../libs/F5/") + cmd = 'java Embed %s %s -e %s -p password -c "stegan by chunk " -q %d' + + df_ILS = pd.read_csv(self.list_file, + names=['hash', 'width', 'height', 'size', 'capacity', 'quality', 'chosen', 'class'], + sep='\t') + df_ILS_TARGET = df_ILS[df_ILS['chosen'] == 1] + + for hash, size, quality in zip(df_ILS_TARGET['hash'], df_ILS_TARGET['size'], df_ILS_TARGET['quality']): + path_img = os.path.join(self.img_dir, hash[:3], hash[3:] + '.jpg') + if path_img: + print path_img + p = Popen(cmd % (path_img, 'res/tmp.jpg', 'res/toembed', quality), shell=True, stdout=PIPE, + stderr=STDOUT) + dict_embedresult[hash] = [line.strip('\n') for line in p.stdout.readlines()] + try: + self._hash_copy('res/tmp.jpg') + except: + pass + + with open(self.list_file + '.embed.log', 'wb') as f: + tsvfile = csv.writer(f, delimiter='\t') + for key, value in dict_embedresult.items(): + tsvfile.writerow([key] + value) + + self._build_list(self.list_file + '.embed') + + # merge + df_ILS_EMBED = pd.read_csv(self.list_file + '.embed', names=['hash', 'width', 'height', 'size', 'quality'], + sep='\t') + length = df_ILS_EMBED.shape[0] + df_ILS_EMBED = df_ILS_EMBED.sort(['size', 'quality'], ascending=True) + df_ILS_EMBED['chosen'] = np.zeros(length, np.int32) + df_ILS_EMBED['class'] = np.ones(length, np.int32) + + df_ILS = df_ILS.append(df_ILS_EMBED, ignore_index=True) + df_ILS.to_csv(self.list_file, header=False, index=False, sep='\t') + + def _embed_inner(self, rate=None): + self.dict_data = {} + f5 = F5.F5(sample_key, 1) + tmp_img = os.path.join(package_dir, '../res/tmp.jpg') + df_ILS = pd.read_csv(self.list_file, + names=['hash', 'width', 'height', 'size', 'capacity', 'quality', 'rate', 'chosen', + 'class'], + sep='\t') + df_ILS_TARGET = df_ILS[df_ILS['chosen'] == 1] + + for hash, capacity in zip(df_ILS_TARGET['hash'], df_ILS_TARGET['capacity']): + path_img = os.path.join(self.img_dir, hash[:3], hash[3:] + '.jpg') + if path_img: + print path_img + if rate == None: + embed_rate = f5.embed_raw_data(path_img, os.path.join(package_dir, '../res/toembed'), tmp_img) + else: + assert (rate >= 0 and rate < 1) + # print capacity + hidden = np.random.bytes(int(capacity * rate) / 8) + embed_rate = f5.embed_raw_data(path_img, hidden, tmp_img, frommem=True) + try: + with open(tmp_img, 'rb') as f: + index = md5(f.read()).hexdigest() + im = Jpeg(tmp_img, key=sample_key) + self.dict_data[index] = [im.image_width, im.image_height, im.image_width * im.image_height, + im.getCapacity(), + im.getQuality(), embed_rate] + + dir = os.path.join(self.img_dir, index[:3]) + if not os.path.exists(dir): + os.makedirs(dir) + image_path = os.path.join(dir, index[3:] + '.jpg') + if not os.path.exists(image_path): + shutil.copy(tmp_img, image_path) + else: + pass + except: + pass + + self._build_list(self.list_file + '.embed') + + # merge + df_ILS_EMBED = pd.read_csv(self.list_file + '.embed', + names=['hash', 'width', 'height', 'size', 'capacity', 'quality', 'rate'], + sep='\t') + + df_ILS_EMBED = df_ILS_EMBED.sort(['rate', 'capacity', 'size', 'quality'], ascending=True) + df_ILS_EMBED['chosen'] = np.zeros(df_ILS_EMBED.shape[0], np.int32) + df_ILS_EMBED['class'] = np.ones(df_ILS_EMBED.shape[0], np.int32) + + # print df_ILS_EMBED.dtypes + # print df_ILS.dtypes + # Form the intersection of two Index objects. Sortedness of the result is not guaranteed + df_ILS = df_ILS.append(df_ILS_EMBED, ignore_index=True) + df_ILS.to_csv(self.list_file, header=False, index=False, sep='\t') + + def embed(self, rate=None): + self._embed_inner(rate) + + def get_table(self): + if self.table != None: + return self.table + + if self.connection is None: + c = happybase.Connection('HPC-server') + self.connection = c + + tables = self.connection.tables() + if self.table_name not in tables: + families = {'cf_pic': dict(), + 'cf_info': dict(max_versions=10), + 'cf_tag': dict(), + 'cf_feat': dict(), + } + self.connection.create_table(name=self.table_name, families=families) + + table = self.connection.table(name=self.table_name) + + self.table = table + + return table + + + def store_img(self): + if self.table == None: + self.table = self.get_table() + + dict_databuf = {} + + with open(self.list_file, 'rb') as tsvfile: + tsvfile = csv.reader(tsvfile, delimiter='\t') + for line in tsvfile: + path_img = os.path.join(self.img_dir, line[0][:3], line[0][3:] + '.jpg') + if path_img: + with open(path_img, 'rb') as fpic: + dict_databuf[line[0] + '.jpg'] = fpic.read() + + try: + with self.table.batch(batch_size=5000) as b: + for imgname, imgdata in dict_databuf.items(): + b.put(imgname, {'cf_pic:data': imgdata}) + except ValueError: + raise + + + def store_info(self, infotype='all'): + if self.table == None: + self.table = self.get_table() + + dict_infobuf = {} + + with open(self.list_file, 'rb') as tsvfile: + tsvfile = csv.reader(tsvfile, delimiter='\t') + for line in tsvfile: + dict_infobuf[line[0] + '.jpg'] = line[1:-2] + + if infotype == 'all': + try: + with self.table.batch(batch_size=5000) as b: + for imgname, imginfo in dict_infobuf.items(): + b.put(imgname, + {'cf_info:width': imginfo[0], 'cf_info:height': imginfo[1], 'cf_info:size': imginfo[2], + 'cf_info:capacity': imginfo[3], + 'cf_info:quality': imginfo[4]}) + except ValueError: + raise + else: + raise Exception("Unknown infotype!") + + + def store_tag(self, tagtype='all'): + if self.table == None: + self.table = self.get_table() + + dict_tagbuf = {} + + with open(self.list_file, 'rb') as tsvfile: + tsvfile = csv.reader(tsvfile, delimiter='\t') + for line in tsvfile: + dict_tagbuf[line[0] + '.jpg'] = line[-2:] + + if tagtype == 'all': + try: + with self.table.batch(batch_size=5000) as b: + for imgname, imgtag in dict_tagbuf.items(): + b.put(imgname, {'cf_tag:chosen': imgtag[0], 'cf_tag:class': imgtag[1]}) + except ValueError: + raise + else: + raise Exception("Unknown tagtype!") + + + def store_feat(self, feattype='ibd'): + if self.table == None: + self.table = self.get_table() + + dict_featbuf = {} + for path, subdirs, files in os.walk(self.feat_dir): + for name in files: + featpath = os.path.join(path, name) + # print featpath + with open(featpath, 'rb') as featfile: + imgname = path.split('/')[-1] + name.replace('.' + feattype, '.jpg') + dict_featbuf[imgname] = featfile.read() + + try: + with self.table.batch(batch_size=5000) as b: + for imgname, featdesc in dict_featbuf.items(): + b.put(imgname, {'cf_feat:' + feattype: featdesc}) + except ValueError: + raise + pass + + + def load_data(self, mode='local', feattype='ibd', tagtype='class'): + INDEX = [] + X = [] + Y = [] + + if mode == "local": + + dict_dataset = {} + + with open(self.list_file, 'rb') as tsvfile: + tsvfile = csv.reader(tsvfile, delimiter='\t') + for line in tsvfile: + hash = line[0] + tag = line[-1] + path_feat = os.path.join(self.feat_dir, hash[:3], hash[3:] + '.' + feattype) + if path_feat: + with open(path_feat, 'rb') as featfile: + dict_dataset[hash] = (tag, json.loads(featfile.read())) + + for tag, feat in dict_dataset.values(): + X.append([item for sublist in feat for subsublist in sublist for item in subsublist]) + Y.append(int(tag)) + + elif mode == "remote" or mode == "hbase": + if self.table == None: + self.table = self.get_table() + + col_feat, col_tag = 'cf_feat:' + feattype, 'cf_tag:' + tagtype + for key, data in self.table.scan(columns=[col_feat, col_tag]): + X.append(json.loads(data[col_feat])) + Y.append(1 if data[col_tag] == 'True' else 0) + + elif mode == "spark" or mode == "cluster": + if self.sparkcontex == None: + self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077') + + result = self.sparkcontex.read_hbase(self.table_name) # result = {key:[feat,tag],...} + for feat, tag in result: + X.append(feat) + Y.append(tag) + + else: + raise Exception("Unknown mode!") + + return X, Y + + + + + + + + + + + + diff --git a/mdata/ILSVRC.py b/mdata/ILSVRC.py index 7e379a2..8d83c44 100644 --- a/mdata/ILSVRC.py +++ b/mdata/ILSVRC.py @@ -419,7 +419,7 @@ class DataILSVRC(DataDumperBase): if self.sparkcontex == None: self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077') - result = self.sparkcontex.read_habase(self.table_name) # result = {key:[feat,tag],...} + result = self.sparkcontex.read_hbase(self.table_name) # result = {key:[feat,tag],...} for feat, tag in result: X.append(feat) Y.append(tag) diff --git a/mdata/MSR.py b/mdata/MSR.py index d479618..7e0a16a 100644 --- a/mdata/MSR.py +++ b/mdata/MSR.py @@ -260,7 +260,7 @@ class DataMSR(DataDumperBase): if self.sparkcontex == None: self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077') - result = self.sparkcontex.read_habase(self.table_name) # result = {key:[feat,tag],...} + result = self.sparkcontex.read_hbase(self.table_name) # result = {key:[feat,tag],...} for key, data in result.items(): X.append(data[0]) Y.append(data[1]) diff --git a/mspark/SC.py b/mspark/SC.py index 2d080b2..74cdc23 100644 --- a/mspark/SC.py +++ b/mspark/SC.py @@ -47,7 +47,7 @@ class Sparker(object): self.model = None - def read_habase(self, table_name, columns=None): + def read_hbase(self, table_name, func=None, collect=False): """ ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data @@ -59,7 +59,7 @@ class Sparker(object): """ hconf = {"hbase.zookeeper.quorum": self.host, "hbase.mapreduce.inputtable": table_name, - } + } hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], keyClass=hparams["readKeyClass"], @@ -67,11 +67,16 @@ class Sparker(object): keyConverter=hparams["readKeyConverter"], valueConverter=hparams["readValueConverter"], conf=hconf) - hbase_rdd = hbase_rdd.map(lambda x: parse_cv(x)) - output = hbase_rdd.collect() - return output - def write_habase(self, table_name, data): + parser = func if func != None else parse_cv + hbase_rdd = hbase_rdd.map(lambda x: parser(x)) + + if collect: + return hbase_rdd.collect() + else: + return hbase_rdd + + def write_hbase(self, table_name, data): """ Data Format: e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]] @@ -82,7 +87,7 @@ class Sparker(object): "mapreduce.outputformat.class": hparams["outputFormatClass"], "mapreduce.job.output.key.class": hparams["writeKeyClass"], "mapreduce.job.output.value.class": hparams["writeValueClass"], - } + } self.sc.parallelize(data).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( conf=hconf, -- libgit2 0.21.2