__author__ = 'chunk' """ Yun Q. Shi, et al - A Markov Process Based Approach to Effective Attacking JPEG Steganography """ import time import math import numpy as np from msteg.StegBase import * import mjsteg import jpegObj from common import * import csv import json import pickle from sklearn import svm base_dir = '/home/hadoop/data/HeadShoulder/' class MPB(StegBase): """ Markov Process Based Steganalyasis Algo. """ def __init__(self): StegBase.__init__(self, sample_key) def get_trans_prob_mat_orig(self, ciq, T=4): """ Original! Calculate Transition Probability Matrix. :param ciq: jpeg DCT coeff matrix, 2-D numpy array of int16 (pre-abs) :param T: signed integer, usually 1~7 :return: TPM - 3-D tensor, numpy array of size (2*T+1, 2*T+1, 4) """ ciq = np.absolute(ciq).clip(0, T) TPM = np.zeros((2 * T + 1, 2 * T + 1, 4), np.float64) # Fh = np.diff(ciq, axis=-1) # Fv = np.diff(ciq, axis=0) Fh = ciq[:-1, :-1] - ciq[:-1, 1:] Fv = ciq[:-1, :-1] - ciq[1:, :-1] Fd = ciq[:-1, :-1] - ciq[1:, 1:] Fm = ciq[:-1, 1:] - ciq[1:, :-1] Fh1 = Fh[:-1, :-1] Fh2 = Fh[:-1, 1:] Fv1 = Fv[:-1, :-1] Fv2 = Fv[1:, :-1] Fd1 = Fd[:-1, :-1] Fd2 = Fd[1:, 1:] Fm1 = Fm[:-1, 1:] Fm2 = Fm[1:, :-1] # original:(very slow!) for n in range(-T, T + 1): for m in range(-T, T + 1): dh = np.sum(Fh1 == m) * 1.0 dv = np.sum(Fv1 == m) * 1.0 dd = np.sum(Fd1 == m) * 1.0 dm = np.sum(Fm1 == m) * 1.0 if dh != 0: TPM[m, n, 0] = np.sum(np.logical_and(Fh1 == m, Fh2 == n)) / dh if dv != 0: TPM[m, n, 1] = np.sum(np.logical_and(Fv1 == m, Fv2 == n)) / dv if dd != 0: TPM[m, n, 2] = np.sum(np.logical_and(Fd1 == m, Fd2 == n)) / dd if dm != 0: TPM[m, n, 3] = np.sum(np.logical_and(Fm1 == m, Fm2 == n)) / dm # 1.422729s return TPM def get_trans_prob_mat(self, ciq, T=4): """ Calculate Transition Probability Matrix. :param ciq: jpeg DCT coeff matrix, 2-D numpy array of int16 (pre-abs) :param T: signed integer, usually 1~7 :return: TPM - 3-D tensor, numpy array of size (2*T+1, 2*T+1, 4) """ # return self.get_trans_prob_mat_orig(ciq, T) # timer = Timer() ciq = np.absolute(ciq).clip(0, T) TPM = np.zeros((2 * T + 1, 2 * T + 1, 4), np.float64) # Fh = np.diff(ciq, axis=-1) # Fv = np.diff(ciq, axis=0) Fh = ciq[:-1, :-1] - ciq[:-1, 1:] Fv = ciq[:-1, :-1] - ciq[1:, :-1] Fd = ciq[:-1, :-1] - ciq[1:, 1:] Fm = ciq[:-1, 1:] - ciq[1:, :-1] Fh1 = Fh[:-1, :-1] Fh2 = Fh[:-1, 1:] Fv1 = Fv[:-1, :-1] Fv2 = Fv[1:, :-1] Fd1 = Fd[:-1, :-1] Fd2 = Fd[1:, 1:] Fm1 = Fm[:-1, 1:] Fm2 = Fm[1:, :-1] # 0.089754s # timer.mark() # TPM[Fh1.ravel(), Fh2.ravel(), 0] += 1 # TPM[Fv1.ravel(), Fv2.ravel(), 1] += 1 # TPM[Fd1.ravel(), Fd2.ravel(), 2] += 1 # TPM[Fm1.ravel(), Fm2.ravel(), 3] += 1 # timer.report() # 1.936746s # timer.mark() for m, n in zip(Fh1.ravel(), Fh2.ravel()): TPM[m, n, 0] += 1 for m, n in zip(Fv1.ravel(), Fv2.ravel()): TPM[m, n, 1] += 1 for m, n in zip(Fd1.ravel(), Fd2.ravel()): TPM[m, n, 2] += 1 for m, n in zip(Fm1.ravel(), Fm2.ravel()): TPM[m, n, 3] += 1 # timer.report() # 0.057505s # timer.mark() for m in range(-T, T + 1): dh = np.sum(Fh1 == m) * 1.0 dv = np.sum(Fv1 == m) * 1.0 dd = np.sum(Fd1 == m) * 1.0 dm = np.sum(Fm1 == m) * 1.0 if dh != 0: TPM[m, :, 0] /= dh if dv != 0: TPM[m, :, 1] /= dv if dd != 0: TPM[m, :, 2] /= dd if dm != 0: TPM[m, :, 3] /= dm # timer.report() return TPM def _load_dataset(self,list_file): """ load jpeg dataset according to a file of file-list. :param list_file: a tsv file with each line for a jpeg file path :return:(X,Y) for SVM """ X = [] Y = [] dict_tagbuf = {} dict_dataset = {} with open(list_file, 'rb') as tsvfile: tsvfile = csv.reader(tsvfile, delimiter='\t') for line in tsvfile: imgname = line[0] + '.jpg' dict_tagbuf[imgname] = line[1] dir = base_dir + 'Feat/' for path, subdirs, files in os.walk(dir + 'Train/'): for name in files: featpath = os.path.join(path, name) # print featpath with open(featpath, 'rb') as featfile: imgname = path.split('/')[-1] + name.replace('.mpb', '.jpg') dict_dataset[imgname] = json.loads(featfile.read()) for imgname, tag in dict_tagbuf.items(): tag = 1 if tag == 'True' else 0 X.append(dict_dataset[imgname]) Y.append(tag) return X, Y