__author__ = 'chunk' """ Yun Q. Shi, et al - A Markov Process Based Approach to Effective Attacking JPEG Steganography """ import time import math import numpy as np from msteg.StegBase import * import mjsteg import jpegObj from common import * import csv import json import pickle import cv2 from sklearn import svm base_dir = '/home/hadoop/data/HeadShoulder/' class MPB(StegBase): """ Markov Process Based Steganalyasis Algo. """ def __init__(self): StegBase.__init__(self, sample_key) self.model = None self.svm = None def _get_trans_prob_mat_orig(self, ciq, T=4): """ Original! Calculate Transition Probability Matrix. :param ciq: jpeg DCT coeff matrix, 2-D numpy array of int16 (pre-abs) :param T: signed integer, usually 1~7 :return: TPM - 3-D tensor, numpy array of size (2*T+1, 2*T+1, 4) """ ciq = np.absolute(ciq).clip(0, T) TPM = np.zeros((2 * T + 1, 2 * T + 1, 4), np.float64) # Fh = np.diff(ciq, axis=-1) # Fv = np.diff(ciq, axis=0) Fh = ciq[:-1, :-1] - ciq[:-1, 1:] Fv = ciq[:-1, :-1] - ciq[1:, :-1] Fd = ciq[:-1, :-1] - ciq[1:, 1:] Fm = ciq[:-1, 1:] - ciq[1:, :-1] Fh1 = Fh[:-1, :-1] Fh2 = Fh[:-1, 1:] Fv1 = Fv[:-1, :-1] Fv2 = Fv[1:, :-1] Fd1 = Fd[:-1, :-1] Fd2 = Fd[1:, 1:] Fm1 = Fm[:-1, 1:] Fm2 = Fm[1:, :-1] # original:(very slow!) for n in range(-T, T + 1): for m in range(-T, T + 1): dh = np.sum(Fh1 == m) * 1.0 dv = np.sum(Fv1 == m) * 1.0 dd = np.sum(Fd1 == m) * 1.0 dm = np.sum(Fm1 == m) * 1.0 if dh != 0: TPM[m, n, 0] = np.sum(np.logical_and(Fh1 == m, Fh2 == n)) / dh if dv != 0: TPM[m, n, 1] = np.sum(np.logical_and(Fv1 == m, Fv2 == n)) / dv if dd != 0: TPM[m, n, 2] = np.sum(np.logical_and(Fd1 == m, Fd2 == n)) / dd if dm != 0: TPM[m, n, 3] = np.sum(np.logical_and(Fm1 == m, Fm2 == n)) / dm # 1.422729s return TPM def get_trans_prob_mat(self, ciq, T=4): """ Calculate Transition Probability Matrix. :param ciq: jpeg DCT coeff matrix, 2-D numpy array of int16 (pre-abs) :param T: signed integer, usually 1~7 :return: TPM - 3-D tensor, numpy array of size (2*T+1, 2*T+1, 4) """ # return self._get_trans_prob_mat_orig(ciq, T) # timer = Timer() ciq = np.absolute(ciq).clip(0, T) TPM = np.zeros((2 * T + 1, 2 * T + 1, 4), np.float64) # Fh = np.diff(ciq, axis=-1) # Fv = np.diff(ciq, axis=0) Fh = ciq[:-1, :-1] - ciq[:-1, 1:] Fv = ciq[:-1, :-1] - ciq[1:, :-1] Fd = ciq[:-1, :-1] - ciq[1:, 1:] Fm = ciq[:-1, 1:] - ciq[1:, :-1] Fh1 = Fh[:-1, :-1] Fh2 = Fh[:-1, 1:] Fv1 = Fv[:-1, :-1] Fv2 = Fv[1:, :-1] Fd1 = Fd[:-1, :-1] Fd2 = Fd[1:, 1:] Fm1 = Fm[:-1, 1:] Fm2 = Fm[1:, :-1] # 0.089754s # timer.mark() # TPM[Fh1.ravel(), Fh2.ravel(), 0] += 1 # TPM[Fv1.ravel(), Fv2.ravel(), 1] += 1 # TPM[Fd1.ravel(), Fd2.ravel(), 2] += 1 # TPM[Fm1.ravel(), Fm2.ravel(), 3] += 1 # timer.report() # 1.936746s # timer.mark() for m, n in zip(Fh1.ravel(), Fh2.ravel()): TPM[m, n, 0] += 1 for m, n in zip(Fv1.ravel(), Fv2.ravel()): TPM[m, n, 1] += 1 for m, n in zip(Fd1.ravel(), Fd2.ravel()): TPM[m, n, 2] += 1 for m, n in zip(Fm1.ravel(), Fm2.ravel()): TPM[m, n, 3] += 1 # timer.report() # 0.057505s # timer.mark() for m in range(-T, T + 1): dh = np.sum(Fh1 == m) * 1.0 dv = np.sum(Fv1 == m) * 1.0 dd = np.sum(Fd1 == m) * 1.0 dm = np.sum(Fm1 == m) * 1.0 if dh != 0: TPM[m, :, 0] /= dh if dv != 0: TPM[m, :, 1] /= dv if dd != 0: TPM[m, :, 2] /= dd if dm != 0: TPM[m, :, 3] /= dm # timer.report() return TPM def load_dataset(self, mode, file): if mode == 'local': return self._load_dataset_from_local(file) elif mode == 'remote' or mode == 'hbase': return self._load_dataset_from_hbase(file) else: raise Exception("Unknown mode!") def _load_dataset_from_local(self, list_file='images_map_Train.tsv'): """ load jpeg dataset according to a file of file-list. :param list_file: a tsv file with each line for a jpeg file path :return:(X,Y) for SVM """ list_file = base_dir + list_file X = [] Y = [] dict_tagbuf = {} dict_dataset = {} with open(list_file, 'rb') as tsvfile: tsvfile = csv.reader(tsvfile, delimiter='\t') for line in tsvfile: imgname = line[0] + '.jpg' dict_tagbuf[imgname] = line[1] dir = base_dir + 'Feat/' for path, subdirs, files in os.walk(dir + 'Train/'): for name in files: featpath = os.path.join(path, name) # print featpath with open(featpath, 'rb') as featfile: imgname = path.split('/')[-1] + name.replace('.mpb', '.jpg') dict_dataset[imgname] = json.loads(featfile.read()) for imgname, tag in dict_tagbuf.items(): tag = 1 if tag == 'True' else 0 X.append(dict_dataset[imgname]) Y.append(tag) return X, Y def _load_dataset_from_hbase(self, table='ImgCV'): pass def _model_svm_train_sk(self, X, Y): timer = Timer() timer.mark() lin_clf = svm.LinearSVC() lin_clf.fit(X, Y) with open('res/tmp.model', 'wb') as modelfile: model = pickle.dump(lin_clf, modelfile) timer.report() self.svm = 'sk' self.model = lin_clf return lin_clf def _model_svm_predict_sk(self, image, clf=None): if clf is None: if self.svm == 'sk' and self.model != None: clf = self.model else: with open('res/tmp.model', 'rb') as modelfile: clf = pickle.load(modelfile) im = jpegObj.Jpeg(image, key=sample_key) ciq = im.coef_arrays[jpegObj.colorMap['Y']] tpm = self.get_trans_prob_mat(ciq) return clf.predict(tpm) def _model_svm_train_cv(self, X, Y): svm_params = dict(kernel_type=cv2.SVM_LINEAR, svm_type=cv2.SVM_C_SVC, C=2.67, gamma=5.383) timer = Timer() timer.mark() svm = cv2.SVM() svm.train(X, Y, params=svm_params) svm.save('res/svm_data.model') self.svm = 'cv' self.model = svm return svm def _model_svm_predict_cv(self, image, svm=None): if svm is None: if self.svm == 'cv' and self.model != None: clf = self.model else: svm = cv2.SVM() svm.load('res/svm_data.model') im = jpegObj.Jpeg(image, key=sample_key) ciq = im.coef_arrays[jpegObj.colorMap['Y']] tpm = self.get_trans_prob_mat(ciq) return svm.predict(tpm) def train_svm(self): X, Y = self.load_dataset('local', 'images_map_Train.tsv') return self._model_svm_train_sk(X, Y) def predict_svm(self,image): return self._model_svm_predict_sk(image)