__author__ = 'chunk' """ Yun Q. Shi, et al - A Markov Process Based Approach to Effective Attacking JPEG Steganography """ import time import math import numpy as np from .. import * from ...common import * # from numba import jit base_dir = '/home/hadoop/data/HeadShoulder/' class MPB(StegBase): """ Markov Process Based Steganalyasis Algo. """ def __init__(self): StegBase.__init__(self, sample_key) self.model = None self.svm = None def _get_trans_prob_mat_orig(self, ciq, T=4): """ Original! Calculate Transition Probability Matrix. :param ciq: jpeg DCT coeff matrix, 2-D numpy array of int16 (pre-abs) :param T: signed integer, usually 1~7 :return: TPM - 3-D tensor, numpy array of size (2*T+1, 2*T+1, 4) """ ciq = np.absolute(ciq) TPM = np.zeros((2 * T + 1, 2 * T + 1, 4), np.float64) # Fh = np.diff(ciq, axis=-1) # Fv = np.diff(ciq, axis=0) Fh = (ciq[:-1, :-1] - ciq[:-1, 1:]).clip(-T, T) Fv = (ciq[:-1, :-1] - ciq[1:, :-1]).clip(-T, T) Fd = (ciq[:-1, :-1] - ciq[1:, 1:]).clip(-T, T) Fm = (ciq[:-1, 1:] - ciq[1:, :-1]).clip(-T, T) Fh1 = Fh[:-1, :-1] Fh2 = Fh[:-1, 1:] Fv1 = Fv[:-1, :-1] Fv2 = Fv[1:, :-1] Fd1 = Fd[:-1, :-1] Fd2 = Fd[1:, 1:] Fm1 = Fm[:-1, 1:] Fm2 = Fm[1:, :-1] # original:(very slow!) for n in range(-T, T + 1): for m in range(-T, T + 1): dh = np.sum(Fh1 == m) * 1.0 dv = np.sum(Fv1 == m) * 1.0 dd = np.sum(Fd1 == m) * 1.0 dm = np.sum(Fm1 == m) * 1.0 if dh != 0: TPM[m, n, 0] = np.sum(np.logical_and(Fh1 == m, Fh2 == n)) / dh if dv != 0: TPM[m, n, 1] = np.sum(np.logical_and(Fv1 == m, Fv2 == n)) / dv if dd != 0: TPM[m, n, 2] = np.sum(np.logical_and(Fd1 == m, Fd2 == n)) / dd if dm != 0: TPM[m, n, 3] = np.sum(np.logical_and(Fm1 == m, Fm2 == n)) / dm # 1.422729s return TPM # @jit def get_trans_prob_mat(self, ciq, T=4): """ Calculate Transition Probability Matrix. :param ciq: jpeg DCT coeff matrix, 2-D numpy array of int16 (pre-abs) :param T: signed integer, usually 1~7 :return: TPM - 3-D tensor, numpy array of size (2*T+1, 2*T+1, 4) """ return self._get_trans_prob_mat_orig(ciq, T) # timer = Timer() # ciq = np.absolute(ciq).clip(0, T) # Fool !!! ciq = np.absolute(ciq) TPM = np.zeros((2 * T + 1, 2 * T + 1, 4), np.float64) # Fh = np.diff(ciq, axis=-1) # Fv = np.diff(ciq, axis=0) # Fh = ciq[:-1, :-1] - ciq[:-1, 1:] # Fv = ciq[:-1, :-1] - ciq[1:, :-1] # Fd = ciq[:-1, :-1] - ciq[1:, 1:] # Fm = ciq[:-1, 1:] - ciq[1:, :-1] Fh = (ciq[:-1, :-1] - ciq[:-1, 1:]).clip(-T, T) Fv = (ciq[:-1, :-1] - ciq[1:, :-1]).clip(-T, T) Fd = (ciq[:-1, :-1] - ciq[1:, 1:]).clip(-T, T) Fm = (ciq[:-1, 1:] - ciq[1:, :-1]).clip(-T, T) Fh1 = Fh[:-1, :-1].ravel() Fh2 = Fh[:-1, 1:].ravel() Fv1 = Fv[:-1, :-1].ravel() Fv2 = Fv[1:, :-1].ravel() Fd1 = Fd[:-1, :-1].ravel() Fd2 = Fd[1:, 1:].ravel() Fm1 = Fm[:-1, 1:].ravel() Fm2 = Fm[1:, :-1].ravel() # 0.089754s # timer.mark() # TPM[Fh1.ravel(), Fh2.ravel(), 0] += 1 # TPM[Fv1.ravel(), Fv2.ravel(), 1] += 1 # TPM[Fd1.ravel(), Fd2.ravel(), 2] += 1 # TPM[Fm1.ravel(), Fm2.ravel(), 3] += 1 # timer.report() # 1.459668s # timer.mark() # for i in range(len(Fh1)): # TPM[Fh1[i], Fh2[i], 0] += 1 # for i in range(len(Fv1)): # TPM[Fv1[i], Fv2[i], 1] += 1 # for i in range(len(Fd1)): # TPM[Fd1[i], Fd2[i], 2] += 1 # for i in range(len(Fm1)): # TPM[Fm1[i], Fm2[i], 3] += 1 # timer.report() # 1.463982s # timer.mark() for m, n in zip(Fh1.ravel(), Fh2.ravel()): TPM[m, n, 0] += 1 for m, n in zip(Fv1.ravel(), Fv2.ravel()): TPM[m, n, 1] += 1 for m, n in zip(Fd1.ravel(), Fd2.ravel()): TPM[m, n, 2] += 1 for m, n in zip(Fm1.ravel(), Fm2.ravel()): TPM[m, n, 3] += 1 # timer.report() # 0.057505s # timer.mark() for m in range(-T, T + 1): dh = np.sum(Fh1 == m) * 1.0 dv = np.sum(Fv1 == m) * 1.0 dd = np.sum(Fd1 == m) * 1.0 dm = np.sum(Fm1 == m) * 1.0 if dh != 0: TPM[m, :, 0] /= dh if dv != 0: TPM[m, :, 1] /= dv if dd != 0: TPM[m, :, 2] /= dd if dm != 0: TPM[m, :, 3] /= dm # timer.report() return TPM