"""

This module implements the rather sophisticated F5 algorithm which was invented by Andreas Westfeld.

Unlike its vastly inferior predecessors, namely F3 and F4, it features matrix encoding which makes it possible to embed a chunk of k bits within 2^k - 1 bits of the cover data and only change one bit (at most). A bit change is done by subtracting the absolute value of the corresponding DCT coefficient. When the embedding process begins, the parameter k is computed based on the capacity of the cover image and the prospective embedding ratio. With small amount of hidden data k becomes large which leads to a greater embedding efficiency (embedded information per bit change).
A permutation (initialized by a user-supplied seed) of the DCT coefficients helps to scatter each chunk across the entire image. F5 can be seen as meta-algorithm as it uses a coding scheme to change as little data as possible and then applies a simpler algorithm (such as F3) to actually embed data. That is why this module allows the user to specify which embedding function (one of JSteg, F3, F4) should be used. """ import time import math import numpy as np from stegotool.plugins.steganography.F4.F4 import F4 from stegotool.util.JPEGSteg import JPEGSteg from stegotool.util.plugins import describe_annotate_convert from stegotool.util.plugins import ident, ImagePath, FilePath, NewFilePath class F5(JPEGSteg): """ This module has two methods: embed_raw_data to embed data with the F5 algorithm and extract_raw_data to extract data which was embedded previously. """ def __init__(self, ui, core): """ Constructor of the F5 class. """ JPEGSteg.__init__(self, ui, core) self._embed_hook = self._embed_k self._extract_hook = self._extract_k self._embed_fun = None self.dct_p = None self.seed = None self.default_embedding = True self.steg_ind = -1 self.excess_bits = None # needed because k is embedded separately self.cov_ind = -1 self.k_coeff = -1 @describe_annotate_convert((None, None, ident), ("cover image", ImagePath, str), ("hidden data", FilePath, str), ("stego image", NewFilePath, str), ("seed", int, int), ("embedding behavior", ['Default', 'F3', 'JSteg'], str)) def embed_raw_data(self, src_cover, src_hidden, tgt_stego, seed, embed_fun): """

This method embeds arbitrary data into a cover image. The cover image must be a JPEG.

Parameters:

  1. src_cover
    A valid pathname to an image file which serves as cover image (the image which the secret image is embedded into).
  2. src_hidden
    A valid pathname to an arbitrary file that is supposed to be embedded into the cover image.
  3. tgt_stego
    Target pathname of the resulting stego image. You should save to a PNG or another lossless format, because many LSBs don't survive lossy compression.
  4. seed
    A seed for the random number generator that is responsible scattering the secret data within the cover image.
  5. param embed_fun
    Specifies which embedding function should be used. Must be one of 'Default', 'F3', 'Jsteg'. If 'Default' is selected, the algorithm uses the same behavior as Westfeld's implementation, i.e. decrementing absolute values for n > 1 (F3) and using F4 in the special case n = 1. Selecting F3 or JSteg results in using that scheme for all n.

""" self.t0 = time.time() self.seed = seed if embed_fun == 'F3': self._embed_fun = self._f3_embed self.default_embedding = False elif embed_fun == 'JSteg': self._embed_fun = self._jsteg_embed self.default_embedding = False elif embed_fun == 'Default': self._embed_fun = self._f3_embed self.default_embedding = True self.cov_ind = -1 JPEGSteg._post_embed_actions(self, src_cover, src_hidden, tgt_stego) @describe_annotate_convert((None, None, ident), ("stego image", ImagePath, str), ("hidden data", NewFilePath, str), ("seed", int, int), ("embedding behavior", ['Default', 'F3/JSteg'], str)) def extract_raw_data(self, src_steg, tgt_hidden, seed, embed_fun): """

This method extracts secret data from a stego image. It is (obviously) the inverse operation of embed_raw_data.

Parameters:

  1. src_stego
    A valid pathname to an image file which serves as stego image.
  2. tgt_hidden
    A pathname denoting where the extracted data should be saved to.
  3. param seed
    A seed for the random number generator that is responsible scattering the secret data within the cover image.
  4. param embed_fun
    Specifies which embedding function should be used. Must be one of 'Default', 'F3', 'JSteg'. If 'Default' is selected, the algorithm uses the same behavior as Westfeld's implementation, i.e. decrementing absolute values for n > 1 (F3) and using F4 in the special case n = 1. Selecting F3 or JSteg results in using that scheme for all n.
""" self.t0 = time.time() self.seed = seed self.steg_ind = -1 if embed_fun == 'F3/JSteg': self.default_embedding = False elif embed_fun == 'Default': self.default_embedding = True # excess bits occur when the size of extracted data is not a multiple # of k. if excess bits are available, they are prepended to hidden data self.excess_bits = None JPEGSteg._post_extract_actions(self, src_steg, tgt_hidden) def _embed_k(self, cov_data, hid_data): np.random.seed(self.seed) self.dct_p = np.random.permutation(cov_data.size) self.k_coeff = self._find_max_k(cov_data, hid_data) self.ui.display_status('setting k = %d' % self.k_coeff) k_split = self.lookup_tab.split_byte(self.k_coeff, 1)[-4:] # embed k in F3-like style for m in k_split: success = False while not success: self.cov_ind += 1 while cov_data[self.dct_p[self.cov_ind]] == 0 or \ self.dct_p[self.cov_ind] % 64 == 0: self.cov_ind += 1 if m != cov_data[self.dct_p[self.cov_ind]] & 1: cov_data[self.dct_p[self.cov_ind]] -= \ math.copysign(1, cov_data[self.dct_p[self.cov_ind]]) success = cov_data[self.dct_p[self.cov_ind]] != 0 def _extract_k(self, steg_data): # initializing the MT is done only once in order to retain the state self.dct_p = np.random.seed(self.seed) self.dct_p = np.random.permutation(self.steg_data.size) k_split = np.zeros(4, np.uint8) for i in xrange(k_split.size): self.steg_ind += 1 while self.steg_data[self.dct_p[self.steg_ind]] == 0 or\ self.dct_p[self.steg_ind] % 64 == 0: self.steg_ind += 1 k_split[i] = self.steg_data[self.dct_p[self.steg_ind]] & 1 self.k_coeff = self.lookup_tab.merge_words(tuple([0, 0, 0, 0] + list(k_split)), 1) def _find_max_k(self, cov_data, hid_data): cnt = 4 # information about k take up 4 bits # find number of DCT coefficients update_cnt = 10000 for i, c in enumerate(cov_data): if update_cnt == 0: self._set_progress( int(30 * (float(i) / float(cov_data.size)))) update_cnt = 10000 update_cnt -= 1 # pessimistic, but accurate estimation of the capacity of the image ci = int(c) if (not (ci is 0)) and (not ((i % 64) is 0)) \ and (not (ci is 1)) and (not (ci is -1)): cnt += 1 hid_size = hid_data.size cov_size = cnt if cov_size < hid_size: raise Exception("Cannot fit %d bits in %d DCT coefficients. \ Cover image is too small." % (hid_size, cov_size)) self.ui.display_status('DCT embedding ratio = %f' \ % (float(hid_size) / float(cov_size))) k = 1 while True: k += 1 n = (1 << k) - 1 num_chunks = cov_size / n num_emb_bits = num_chunks * k if num_emb_bits < hid_size: return min(k - 1, 15) # low level embedding functions def _f3_embed(self, cov_data, ind): cov_data[ind] -= math.copysign(1, cov_data[ind]) def _jsteg_embed(self, cov_data, ind): m = 1 ^ (cov_data[ind] & 1) cov_data[ind] = (cov_data[ind] & 0xffffe) | m def _raw_embed(self, cov_data, hid_data, status_begin=0): k = self.k_coeff n = (1 << k) - 1 if n == 1 and self.default_embedding: # in case k = n = 1, Westfeld's implementation uses F4 for # embedding. Therefore, if 'default' embedding has been selected # we will do the same f4 = F4(self.ui, self.core) f4.seed = self.seed f4.dct_p = self.dct_p f4.cov_ind = self.cov_ind cov_data = f4._raw_embed(cov_data, hid_data, 30) return cov_data cov_ind = self.cov_ind # preventing RSI by writing 'self' less often hid_ind = 0 remaining_bits = hid_data.size hid_size = float(hid_data.size) dct_p = self.dct_p update_cnt = int(hid_size / (70.0 * k)) while remaining_bits > 0: if update_cnt == 0: self._set_progress(30 + int((( hid_size - remaining_bits) / hid_size) * 70)) update_cnt = int(hid_size / (70.0 * k)) update_cnt -= 1 msg_chunk_size = min(remaining_bits, k) msg_chunk = np.zeros(k, np.int8) cov_chunk = np.zeros(n, np.int32) msg_chunk[0:msg_chunk_size] = hid_data[hid_ind:hid_ind + msg_chunk_size] hid_ind += k # get n DCT coefficients for i in xrange(n): cov_ind += 1 while cov_data[dct_p[cov_ind]] == 0 \ or dct_p[cov_ind] % 64 == 0: cov_ind += 1 cov_chunk[i] = dct_p[cov_ind] success = False while not success: # loop necessary because of shrinkage h = 0 for i in xrange(n): h ^= ((cov_data[cov_chunk[i]] & 1) * (i + 1)) scalar_x = 0 for i in xrange(k): scalar_x = (scalar_x << 1) + msg_chunk[i] s = scalar_x ^ h if s != 0: self._embed_fun(cov_data, cov_chunk[s - 1]) else: break if cov_data[cov_chunk[s - 1]] == 0: # test for shrinkage cov_chunk[s - 1:-1] = cov_chunk[s:] # adjusting cov_ind += 1 while cov_data[dct_p[cov_ind]] == 0 or\ dct_p[cov_ind] % 64 == 0: cov_ind += 1 cov_chunk[n - 1] = dct_p[cov_ind] else: success = True remaining_bits -= k self.k_coeff = -1 # prevent k being read from this instance return cov_data def _raw_extract(self, num_bits): k = self.k_coeff n = (1 << k) - 1 if self.is_header == None: self.is_header = True if n == 1 and self.default_embedding: f4 = F4(self.ui, self.core) f4.seed = self.seed f4.dct_p = self.dct_p f4.steg_data = self.steg_data f4.is_header = self.is_header f4.steg_ind = self.steg_ind hid_data = f4._raw_extract(num_bits) self.steg_ind = f4.steg_ind self.is_header = False return hid_data remaining_bits = num_bits hid_data = np.zeros(num_bits, np.uint8) hid_ind = 0 dct_p = self.dct_p is_header = False # signals whether or not extracting header if self.excess_bits != None: hid_data[hid_ind:hid_ind + self.excess_bits.size] = \ self.excess_bits hid_ind += self.excess_bits.size remaining_bits -= self.excess_bits.size curr_chunk = np.zeros(k, np.uint8) update_cnt = int(num_bits / (100.0 * k)) while remaining_bits > 0: if update_cnt == 0 and not is_header: self._set_progress(int(((float(num_bits) \ - remaining_bits) / num_bits) * 100)) update_cnt = int(num_bits / (100.0 * k)) update_cnt -= 1 steg_chunk = [0 for i in xrange(n)] for i in xrange(n): self.steg_ind += 1 while self.steg_data[dct_p[self.steg_ind]] == 0 or\ dct_p[self.steg_ind] % 64 == 0: self.steg_ind += 1 steg_chunk[i] = self.steg_data[dct_p[self.steg_ind]] h = 0 # hash value for i in xrange(n): h ^= ((steg_chunk[i] & 1) * (i + 1)) for i in xrange(k): curr_chunk[k - i - 1] = h % 2 h /= 2 l = min(k, remaining_bits) for i in xrange(l): hid_data[hid_ind] = curr_chunk[i] hid_ind += 1 # save excess bits (for later calls) if k > remaining_bits: self.excess_bits = curr_chunk[remaining_bits:] else: self.excess_bits = None remaining_bits -= k self.is_header = False return hid_data def __str__(self): return 'F5'