diff --git a/mjpeg/__init__.py b/mjpeg/__init__.py index b0df845..e8c3094 100644 --- a/mjpeg/__init__.py +++ b/mjpeg/__init__.py @@ -8,7 +8,8 @@ __all__ = ['Jpeg', 'colorMap', 'diffblock', 'diffblocks'] # functions from submodules. # # :: - +import numpy as np +from numpy import shape import numpy.random as rnd import base @@ -169,18 +170,6 @@ class Jpeg(Jsteg): E = [-np.inf] + [i for i in range(-T, T + 2)] + [np.inf] return np.histogram(A, E) - def plotHist(self, mask=base.acMaskBlock, T=8): - """ - Make a histogram of the jpeg coefficients. - The mask is a boolean 8x8 matrix indicating the - frequencies to be included. This defaults to the - AC coefficients. - """ - A = self.rawsignal(mask).tolist() - E = [i for i in range(-T, T + 2)] - plt.hist(A, E, histtype='bar') - plt.show() - def nzcount(self, *a, **kw): """Number of non-zero AC coefficients. diff --git a/mjpeg/compress.py b/mjpeg/compress.py index 98e9a84..8566bcf 100644 --- a/mjpeg/compress.py +++ b/mjpeg/compress.py @@ -1,7 +1,7 @@ ## -*- coding: utf-8 -*- - -from pylab import * +from numpy import array +# from pylab import * # The standard quantisation tables for JPEG:: diff --git a/msteg/steganalysis/ChiSquare.py b/msteg/steganalysis/ChiSquare.py deleted file mode 100644 index 7d0bb2c..0000000 --- a/msteg/steganalysis/ChiSquare.py +++ /dev/null @@ -1,162 +0,0 @@ -""" -

-This module implements an algorithm described by Andreas Westfeld in [1,2], -which detects if there was data embedded into an image using JSteg. -It uses the property that JSteg generates pairs of values in the -DCT-coefficients histogram, which can be detected by a \chi^2 test. -

- -
-[1]: Andreas Westfeld, F5 - A Steganographic Algorithm High Capacity Despite
-Better Steganalysis
-[2]: Andreas Westfeld, Angriffe auf steganographische Systeme
-
-""" - -from collections import defaultdict -import os - -from PIL import Image -import numpy -from scipy.stats import chisquare -import matplotlib.pyplot as plt -import itertools as it - -from .. import * - - -class ChiSquare(StegBase): - """ - The module contains only one method, detect. - """ - - def __init__(self, ui, core): - self.ui = ui - self.core = core - - def detect(self, src, tgt, tgt2): - """ -

- Detect if there was data embedded in the source image image with - JSteg algorithm. -

- -

- Parameters: -

    -
  1. Source image
    Image which should be tested
  2. -
  3. Target image
    Image which displays a graphic with the - embedding probability
  4. -
  5. 2nd Target image
    Image which displays the embedding - positions in the image
  6. -
-

- """ - # --------------------------- Input ----------------------------------- - # If src is from the image pool, test whether the image exists encoded - # on the file system. Otherwise we can not read DCT-coefficients. - if self.core.media_manager.is_media_key(src): - src = self.core.media_manager.get_file(src) - if hasattr(src, 'tmp_file'): - src = src.tmp_file - self.ui.display_error('Trying file: %s' % src) - else: - self.ui.display_error('Can not detect anything from \ - decoded images.') - return - # Test whether the file exists. - if not os.path.isfile(src): - self.ui.display_error('No such file.') - return - # Test if it is a JPEG file. - if not self._looks_like_jpeg(src): - self.ui.display_error('Input is probably not a JPEG file.') - return - - # ---------------------------- Algorithm ------------------------------ - # Build DCT-histogram in steps of \approx 1% of all coefficients and - # calculate the p-value at each step. - - # dct_data = rw_dct.read_dct_coefficients(src) - dct_data = self._get_cov_data(src) - - hist = defaultdict(int) - cnt = 0 - l = len(dct_data) - one_p = l / 100 - result = [] - for block in dct_data: - # update the histogram with one block of 64 coefficients - for c in block: - hist[c] += 1 - - cnt += 1 - if not cnt % one_p: - # calculate p-value - self.ui.set_progress(cnt * 100 / l) - - # ignore the pair (0, 1), since JSteg does not embed data there - hl = [hist[i] for i in range(-2048, 2049) if not i in (0, 1)] - k = len(hl) / 2 - observed = [] - expected = [] - # calculate observed and expected distribution - for i in range(k): - t = hl[2 * i] + hl[2 * i + 1] - if t > 3: - observed.append(hl[2 * i]) - expected.append(t / 2) - # calculate (\chi^2, p) - p = chisquare(numpy.array(observed), numpy.array(expected))[1] - result.append(p) - - # ----------------------------- Output -------------------------------- - # Graph displaying the embedding probabilities in relation to the - # sample size. - figure = plt.figure() - plot = figure.add_subplot(111) - plot.grid(True) - plot.plot(result, color='r', linewidth=2.0) - plt.axis([0, 100, 0, 1.1]) - plt.title('Embedding probability for different percentages \ -of the file capacity.') - plt.xlabel('% of file capacity') - plt.ylabel('Embedding probability') - - if self.core.media_manager.is_media_key(tgt): - img = figure_to_pil(figure) - self.core.media_manager.put_media(tgt, img) - else: - plt.savefig(tgt) - - # Image displaying the length and position of the embedded data - # within the image - img2 = Image.open(src) - img2.convert("RGB") - width, height = img2.size - - for i in range(100): - result[i] = max(result[i:]) - - cnt2 = 0 - for (top, left) in it.product(range(0, height, 8), range(0, width, 8)): - if not cnt2 % one_p: - r = result[cnt2 / one_p] - if r >= 0.5: - color = (255, int((1 - r) * 2 * 255), 0) - else: - color = (int(r * 2 * 255), 255, 0) - cnt2 += 1 - img2.paste(color, (left, top, min(left + 8, width), - min(top + 8, height))) - self.core.media_manager.put_media(tgt2, img2) - - def __str__(self): - return 'Chi-Square-Test' - - -def figure_to_pil(figure): - figure.canvas.draw() - return Image.fromstring('RGB', - figure.canvas.get_width_height(), - figure.canvas.tostring_rgb()) diff --git a/msteg/steganalysis/MPB.py.bak b/msteg/steganalysis/MPB.py.bak deleted file mode 100644 index f71d1a8..0000000 --- a/msteg/steganalysis/MPB.py.bak +++ /dev/null @@ -1,300 +0,0 @@ -__author__ = 'chunk' -""" -Yun Q. Shi, et al - A Markov Process Based Approach to Effective Attacking JPEG Steganography -""" - -import time -import math -import numpy as np - -from .. import * -from ...mjpeg import Jpeg,colorMap -from ...common import * - -import csv -import json -import pickle -import cv2 -from sklearn import svm - -base_dir = '/home/hadoop/data/HeadShoulder/' - - -class MPB(StegBase): - """ - Markov Process Based Steganalyasis Algo. - """ - - def __init__(self): - StegBase.__init__(self, sample_key) - self.model = None - self.svm = None - - def _get_trans_prob_mat_orig(self, ciq, T=4): - """ - Original! - Calculate Transition Probability Matrix. - - :param ciq: jpeg DCT coeff matrix, 2-D numpy array of int16 (pre-abs) - :param T: signed integer, usually 1~7 - :return: TPM - 3-D tensor, numpy array of size (2*T+1, 2*T+1, 4) - """ - ciq = np.absolute(ciq).clip(0, T) - TPM = np.zeros((2 * T + 1, 2 * T + 1, 4), np.float64) - # Fh = np.diff(ciq, axis=-1) - # Fv = np.diff(ciq, axis=0) - Fh = ciq[:-1, :-1] - ciq[:-1, 1:] - Fv = ciq[:-1, :-1] - ciq[1:, :-1] - Fd = ciq[:-1, :-1] - ciq[1:, 1:] - Fm = ciq[:-1, 1:] - ciq[1:, :-1] - - Fh1 = Fh[:-1, :-1] - Fh2 = Fh[:-1, 1:] - - Fv1 = Fv[:-1, :-1] - Fv2 = Fv[1:, :-1] - - Fd1 = Fd[:-1, :-1] - Fd2 = Fd[1:, 1:] - - Fm1 = Fm[:-1, 1:] - Fm2 = Fm[1:, :-1] - - # original:(very slow!) - for n in range(-T, T + 1): - for m in range(-T, T + 1): - dh = np.sum(Fh1 == m) * 1.0 - dv = np.sum(Fv1 == m) * 1.0 - dd = np.sum(Fd1 == m) * 1.0 - dm = np.sum(Fm1 == m) * 1.0 - - if dh != 0: - TPM[m, n, 0] = np.sum(np.logical_and(Fh1 == m, Fh2 == n)) / dh - - if dv != 0: - TPM[m, n, 1] = np.sum(np.logical_and(Fv1 == m, Fv2 == n)) / dv - - if dd != 0: - TPM[m, n, 2] = np.sum(np.logical_and(Fd1 == m, Fd2 == n)) / dd - - if dm != 0: - TPM[m, n, 3] = np.sum(np.logical_and(Fm1 == m, Fm2 == n)) / dm - - # 1.422729s - return TPM - - - def get_trans_prob_mat(self, ciq, T=4): - """ - Calculate Transition Probability Matrix. - - :param ciq: jpeg DCT coeff matrix, 2-D numpy array of int16 (pre-abs) - :param T: signed integer, usually 1~7 - :return: TPM - 3-D tensor, numpy array of size (2*T+1, 2*T+1, 4) - """ - - return self._get_trans_prob_mat_orig(ciq, T) - - - # timer = Timer() - ciq = np.absolute(ciq).clip(0, T) - TPM = np.zeros((2 * T + 1, 2 * T + 1, 4), np.float64) - # Fh = np.diff(ciq, axis=-1) - # Fv = np.diff(ciq, axis=0) - Fh = ciq[:-1, :-1] - ciq[:-1, 1:] - Fv = ciq[:-1, :-1] - ciq[1:, :-1] - Fd = ciq[:-1, :-1] - ciq[1:, 1:] - Fm = ciq[:-1, 1:] - ciq[1:, :-1] - - Fh1 = Fh[:-1, :-1].ravel() - Fh2 = Fh[:-1, 1:].ravel() - - Fv1 = Fv[:-1, :-1].ravel() - Fv2 = Fv[1:, :-1].ravel() - - Fd1 = Fd[:-1, :-1].ravel() - Fd2 = Fd[1:, 1:].ravel() - - Fm1 = Fm[:-1, 1:].ravel() - Fm2 = Fm[1:, :-1].ravel() - - - - # 0.089754s - # timer.mark() - # TPM[Fh1.ravel(), Fh2.ravel(), 0] += 1 - # TPM[Fv1.ravel(), Fv2.ravel(), 1] += 1 - # TPM[Fd1.ravel(), Fd2.ravel(), 2] += 1 - # TPM[Fm1.ravel(), Fm2.ravel(), 3] += 1 - # timer.report() - - # 1.459668s - # timer.mark() - # for i in range(len(Fh1)): - # TPM[Fh1[i], Fh2[i], 0] += 1 - # for i in range(len(Fv1)): - # TPM[Fv1[i], Fv2[i], 1] += 1 - # for i in range(len(Fd1)): - # TPM[Fd1[i], Fd2[i], 2] += 1 - # for i in range(len(Fm1)): - # TPM[Fm1[i], Fm2[i], 3] += 1 - # timer.report() - - # 1.463982s - # timer.mark() - for m, n in zip(Fh1.ravel(), Fh2.ravel()): - TPM[m, n, 0] += 1 - - for m, n in zip(Fv1.ravel(), Fv2.ravel()): - TPM[m, n, 1] += 1 - - for m, n in zip(Fd1.ravel(), Fd2.ravel()): - TPM[m, n, 2] += 1 - - for m, n in zip(Fm1.ravel(), Fm2.ravel()): - TPM[m, n, 3] += 1 - # timer.report() - - # 0.057505s - # timer.mark() - for m in range(-T, T + 1): - dh = np.sum(Fh1 == m) * 1.0 - dv = np.sum(Fv1 == m) * 1.0 - dd = np.sum(Fd1 == m) * 1.0 - dm = np.sum(Fm1 == m) * 1.0 - - if dh != 0: - TPM[m, :, 0] /= dh - - if dv != 0: - TPM[m, :, 1] /= dv - - if dd != 0: - TPM[m, :, 2] /= dd - - if dm != 0: - TPM[m, :, 3] /= dm - # timer.report() - - return TPM - - def load_dataset(self, mode, file): - if mode == 'local': - return self._load_dataset_from_local(file) - elif mode == 'remote' or mode == 'hbase': - return self._load_dataset_from_hbase(file) - else: - raise Exception("Unknown mode!") - - def _load_dataset_from_local(self, list_file='images_map_Train.tsv'): - """ - load jpeg dataset according to a file of file-list. - - :param list_file: a tsv file with each line for a jpeg file path - :return:(X,Y) for SVM - """ - list_file = base_dir + list_file - - X = [] - Y = [] - dict_tagbuf = {} - dict_dataset = {} - - with open(list_file, 'rb') as tsvfile: - tsvfile = csv.reader(tsvfile, delimiter='\t') - for line in tsvfile: - imgname = line[0] + '.jpg' - dict_tagbuf[imgname] = line[1] - - dir = base_dir + 'Feat/' - for path, subdirs, files in os.walk(dir + 'Train/'): - for name in files: - featpath = os.path.join(path, name) - # print featpath - with open(featpath, 'rb') as featfile: - imgname = path.split('/')[-1] + name.replace('.mpb', '.jpg') - dict_dataset[imgname] = json.loads(featfile.read()) - - for imgname, tag in dict_tagbuf.items(): - tag = 1 if tag == 'True' else 0 - X.append(dict_dataset[imgname]) - Y.append(tag) - - return X, Y - - - def _load_dataset_from_hbase(self, table='ImgCV'): - pass - - - def _model_svm_train_sk(self, X, Y): - timer = Timer() - timer.mark() - lin_clf = svm.LinearSVC() - lin_clf.fit(X, Y) - with open('res/tmp.model', 'wb') as modelfile: - model = pickle.dump(lin_clf, modelfile) - - timer.report() - - self.svm = 'sk' - self.model = lin_clf - - return lin_clf - - def _model_svm_predict_sk(self, image, clf=None): - if clf is None: - if self.svm == 'sk' and self.model != None: - clf = self.model - else: - with open('res/tmp.model', 'rb') as modelfile: - clf = pickle.load(modelfile) - - im = mjpeg.Jpeg(image, key=sample_key) - ciq = im.coef_arrays[mjpeg.colorMap['Y']] - tpm = self.get_trans_prob_mat(ciq) - - return clf.predict(tpm) - - - def _model_svm_train_cv(self, X, Y): - svm_params = dict(kernel_type=cv2.SVM_LINEAR, - svm_type=cv2.SVM_C_SVC, - C=2.67, gamma=5.383) - - timer = Timer() - timer.mark() - svm = cv2.SVM() - svm.train(X, Y, params=svm_params) - svm.save('res/svm_data.model') - - self.svm = 'cv' - self.model = svm - - return svm - - def _model_svm_predict_cv(self, image, svm=None): - if svm is None: - if self.svm == 'cv' and self.model != None: - clf = self.model - else: - svm = cv2.SVM() - svm.load('res/svm_data.model') - - im = mjpeg.Jpeg(image, key=sample_key) - ciq = im.coef_arrays[mjpeg.colorMap['Y']] - tpm = self.get_trans_prob_mat(ciq) - - return svm.predict(tpm) - - def train_svm(self): - X, Y = self.load_dataset('local', 'images_map_Train.tsv') - return self._model_svm_train_sk(X, Y) - - def predict_svm(self, image): - return self._model_svm_predict_sk(image) - - - - - -- libgit2 0.21.2