## -*- coding: utf-8 -*- from libmjsteg import Jsteg __all__ = ['Jpeg', 'colorMap', 'diffblock', 'diffblocks'] # We need standard components from :mod:`numpy`, and some auxiliary # functions from submodules. # # :: import numpy.random as rnd from numpy import shape import numpy as np import pylab as plt import base from dct import bdct, ibdct from compress import * # The colour codes are defined in the JPEG standard. We store # them here for easy reference by name:: colorCode = { "GRAYSCALE": 1, "RGB": 2, "YCbCr": 3, "CMYK": 4, "YCCK": 5 } colorParam = ['Y', 'Cb', 'Cr'] colorMap = {'Y': 0, 'Cb': 1, 'Cr': 2} # The JPEG class # ============== class Jpeg(Jsteg): """ The jpeg (derived from jpegObject) allows the user to extract a sequence of pseudo-randomly ordered jpeg coefficients for watermarking/steganography, and reinsert them. """ def __init__(self, file=None, key=None, rndkey=True, image=None, verbosity=1, **kw): """ The constructor will return a new Object with data from the given file. The key is used to determine the order of the jpeg coefficients. If no key is given, a random key is extracted using random.SystemRandom(). """ if image != None: raise NotImplementedError, "Compression is not yet implemented" try: Jsteg.__init__(self, file, **kw) except: raise self.verbosity = verbosity if verbosity > 0: print "[Jpeg] %s (%ix%i)" % (self.filename, self.image_width, self.image_height) if key != None: self.key = key elif rndkey: self.key = [base.sysrnd.getrandbits(16) for x in range(16)] else: self.key = None def getkey(self): """Return the key used to shuffle the coefficients.""" return self.key # 1D Signal Representations # ------------------------- def rawsignal(self, mask=base.acMaskBlock, channel="All"): """ Return a 1D array of AC coefficients. (Most applications should use getsignal() rather than rawsignal().) """ R = [] if channel == "All": for X in self.coef_arrays: (h, w) = X.shape A = base.acMask(h, w, mask) R = np.hstack([R, X[A]]) else: cID = self.getCompID(channel) X = self.coef_arrays[cID] (h, w) = X.shape A = base.acMask(h, w, mask) R = np.hstack([R, X[A]]) return R def getsignal(self, mask=base.acMaskBlock, channel="All"): """Return a 1D array of AC coefficients in random order.""" R = self.rawsignal(mask, channel) if self.key == None: return R else: rnd.seed(self.key) return R[rnd.permutation(len(R))] def setsignal(self, R0, mask=base.acMaskBlock, channel="All"): """Reinserts AC coefficients from getitem in the correct positions.""" if self.key != None: rnd.seed(self.key) fst = 0 P = rnd.permutation(len(R0)) R = np.array(R0) R[P] = R0 else: R = R0 if channel == "All": for cID in range(3): X = self.coef_arrays[cID] s = X.size * 63 / 64 (h, w) = X.shape X[base.acMask(h, w, mask)] = R[fst:(fst + s)] fst += s # Jset blocks = self.getCoefBlocks(channel=colorParam[cID]) xmax, ymax = self.Jgetcompdim(cID) for y in range(ymax): for x in range(xmax): block = blocks[y, x] self.Jsetblock(x, y, cID, bytearray(block.astype(np.int16))) else: cID = self.getCompID(channel) X = self.coef_arrays[cID] s = X.size * 63 / 64 (h, w) = X.shape X[base.acMask(h, w, mask)] = R[fst:(fst + s)] fst += s # Jset blocks = self.getCoefBlocks(channel) xmax, ymax = self.Jgetcompdim(cID) for y in range(ymax): for x in range(xmax): block = blocks[y, x] self.Jsetblock(x, y, cID, bytearray(block.astype(np.int16))) assert len(R) == fst # Histogram and Image Statistics # ------------------------------ def abshist(self, mask=base.acMaskBlock, T=8): """ Make a histogram of absolute values for a signal. """ A = abs(self.rawsignal(mask)).tolist() L = len(A) D = {} C = 0 for i in range(T + 1): D[i] = A.count(i) C += D[i] D["high"] = L - C D["total"] = L return D def hist(self, mask=base.acMaskBlock, T=8): """ Make a histogram of the jpeg coefficients. The mask is a boolean 8x8 matrix indicating the frequencies to be included. This defaults to the AC coefficients. """ A = self.rawsignal(mask).tolist() E = [-np.inf] + [i for i in range(-T, T + 2)] + [np.inf] return np.histogram(A, E) def plotHist(self, mask=base.acMaskBlock, T=8): """ Make a histogram of the jpeg coefficients. The mask is a boolean 8x8 matrix indicating the frequencies to be included. This defaults to the AC coefficients. """ A = self.rawsignal(mask).tolist() E = [i for i in range(-T, T + 2)] plt.hist(A, E, histtype='bar') plt.show() def nzcount(self, *a, **kw): """Number of non-zero AC coefficients. Arguments are passed to rawsignal(), so a non-default mask could be specified to get other coefficients than the 63 AC coefficients. """ R = list(self.rawsignal(*a, **kw)) return len(R) - R.count(0) # Access to JPEG Image Data # ------------------------- def getCompID(self, channel): """ Get the index of the given colour channel. """ # How do we adress different channels? colourSpace = self.jpeg_color_space; if colourSpace == colorCode["GRAYSCALE"]: if channel == "Y": return 0 elif channel == None: return 0 else: raise Exception, "Invalid colour space designator" elif colourSpace == colorCode["YCbCr"]: if channel == "Y": return 0 elif channel == "Cb": return 1 elif channel == "Cr": return 2 else: raise Exception, "Invalid colour space designator" raise NotImplementedError, "Only YCbCr and Grayscale are supported." def getQMatrix(self, channel): """ Return the quantisation matrix for the given colour channel. """ cID = self.getCompID(channel) return self.quant_tables[self.comp_info[cID]["quant_tbl_no"]] def getCoefMatrix(self, channel="Y"): """ This method returns the coefficient matrix for the given colour channel (as a matrix). """ cID = self.getCompID(channel) return self.coef_arrays[cID] def setCoefMatrix(self, matrix, channel="Y"): v, h = self.getCoefMatrix(channel).shape assert matrix.shape == (v, h), "matrix is expected of size (%d,%d)" % (v, h) cID = self.getCompID(channel) self.coef_arrays[cID] = matrix blocks = self.getCoefBlocks(channel) xmax, ymax = self.Jgetcompdim(cID) for y in range(ymax): for x in range(xmax): block = blocks[y, x] self.Jsetblock(x, y, cID, bytearray(block.astype(np.int16))) def getCoefBlocks(self, channel="Y"): """ This method returns the coefficient matrix for the given colour channel (as a 4-D tensor: (v,h,row,col)). """ if channel == "All": return [ np.array([np.hsplit(arr, arr.shape[1] / 8) for arr in np.vsplit(compMat, compMat.shape[0] / 8)]) for compMat in self.coef_arrays] compMat = self.getCoefMatrix(channel) return np.array([np.hsplit(arr, arr.shape[1] / 8) for arr in np.vsplit(compMat, compMat.shape[0] / 8)]) def getCoefBlock(self, channel="Y", loc=(0, 0)): """ This method returns the coefficient matrix for the given colour channel (as a 4-D tensor: (v,h,row,col)). """ return self.getCoefBlocks(channel)[loc] def setCoefBlock(self, block, channel="Y", loc=(0, 0)): assert block.shape == (8, 8), "block is expected of size (8,8)" cID = self.getCompID(channel) v, h = loc[0] * 8, loc[1] * 8 self.coef_arrays[cID][v:v + 8, h:h + 8] = block self.Jsetblock(loc[1], loc[0], cID, bytearray(block.astype(np.int16))) def setCoefBlocks(self, blocks, channel="Y"): assert blocks.shape[-2:] == (8, 8), "block is expected of size (8,8)" cID = self.getCompID(channel) vmax, hmax = blocks.shape[:2] for i in range(vmax): for j in range(hmax): v, h = i * 8, j * 8 self.coef_arrays[cID][v:v + 8, h:h + 8] = blocks[i, j] self.Jsetblock(j, i, cID, bytearray(blocks[i, j].astype(np.int16))) def getSize(self): return self.image_width, self.image_height def getCapacity(self, channel="Y"): blocks = self.rawsignal(channel=channel) return np.sum(blocks != 0) # blocks = self.getCoefBlocks(channel) # capacity = 0 # if channel == "All": # for subblocks in blocks: # capacity += (np.sum(subblocks != 0) - np.size(subblocks) / 64) # else: # capacity = (np.sum(blocks != 0) - np.size(blocks) / 64) # return capacity # return (np.sum(blocks[0] != 0) - np.size(blocks[0]) / 64) + (np.sum(blocks[1] != 0) - np.size( # blocks[1]) / 64) / 4 + (np.sum(blocks[2] != 0) - np.size(blocks[2]) / 64) / 4 # return np.sum(np.array(self.coef_arrays)!=0) - np.size(self.coef_arrays) / 64 def getQuality(self): """ Qaulity rating algorithm from ImageMagick. e.g. find ./ -name "*.jpg" | xargs -i sh -c "echo -n {} && identify -quiet -verbose {} |grep -E 'Quality' " Ref - http://stackoverflow.com/questions/2024947/is-it-possible-to-tell-the-quality-level-of-a-jpeg,http://www.imagemagick.org/discourse-server/viewtopic.php?f=1&t=20235 """ sum0 = np.sum(self.quant_tables[0]) sum1 = np.sum(self.quant_tables[1]) quality = None if sum0 != None: if sum1 != None: sum = sum0 + sum1 qvalue = self.quant_tables[0].ravel()[2] + self.quant_tables[0].ravel()[53] + \ self.quant_tables[1].ravel()[0] + self.quant_tables[1].ravel()[-1] hashtable = bi_hash sumtable = bi_sum else: sum = sum0 qvalue = self.quant_tables[0].ravel()[2] + self.quant_tables[0].ravel()[53] hashtable = single_hash sumtable = single_sum else: raise Exception("Quantization Tables Illegal") return None for i in range(100): if qvalue >= hashtable[i] or sum >= sumtable[i]: break quality = i + 1 return quality # Decompression # ------------- def getSpatial(self, channel="Y"): """ This method returns one decompressed colour channel as a matrix. The appropriate JPEG coefficient matrix is dequantised (using the quantisation tables held by the object) and inverse DCT transformed. """ X = self.getCoefMatrix(channel) Q = self.getQMatrix(channel) (M, N) = shape(X) assert M % 8 == 0, "Image size not divisible by 8" assert N % 8 == 0, "Image size not divisible by 8" D = X * base.repmat(Q, (M / 8, N / 8)) S = ibdct(D) # assert max( abs(S).flatten() ) <=128, "Image colours out of range" return (S + 128 ).astype(np.uint8) # Complete, general decompression is not yet implemented:: def getimage(self): """ Decompress the image and a PIL Image object. """ # Probably better to use a numpy image/array. raise NotImplementedError, "Decompression is not yet implemented" # We miss the routines for upsampling and adjusting the size L = len(self.coef_arrays) im = [] for i in range(L): C = self.coef_arrays[i] if C != None: Q = self.quant_tables[self.comp_info[i]["quant_tbl_no"]] im.append(ibdct(dequantise(C, Q))) return Image.fromarray(im) # Calibration # ----------- def getCalibrated(self, channel="Y", mode="all"): """ Return a calibrated coefficient matrix for the given channel. Channel may be "Y", "Cb", or "Cr" for YCbCr format. For Grayscale images, it may be None or "Y". """ S = self.getSpatial(channel) (M, N) = shape(S) assert M % 8 == 0, "Image size not divisible by 8" assert N % 8 == 0, "Image size not divisible by 8" if mode == "col": S1 = S[:, 4:(N - 4)] cShape = ( M / 8, N / 8 - 1 ) else: S1 = S[4:(M - 4), 4:(N - 4)] cShape = ( (M - 1) / 8, (N - 1) / 8 ) D = bdct(S1 - 128) X = D / base.repmat(self.getQMatrix(channel), cShape) return np.round(X) def calibrate(self, *a, **kw): assert len(self.coef_arrays) == 1 self.coef_arrays[0] = self.getCalibrated(*a, **kw) def getCalSpatial(self, channel="Y"): """ Return the decompressed, calibrated, grayscale image. A different colour channel can be selected with the channel argument. """ # We calibrate the image, obtaining a JPEG matrix. C = self.getCalibrated(channel) # The rest is straight forward JPEG decompression. (M, N) = shape(C) cShape = (M / 8, N / 8) D = C * base.repmat(self.getQMatrix(channel), cShape) S = np.round(ibdct(D) + 128) return S.astype(np.uint8) def diffblock(c1, c2): diff = False if np.array_equal(c1, c2): print("blocks match") else: print("blocks not match") diff = True return diff def diffblocks(a, b): diff = False cnt = 0 for comp in range(a.image_components): xmax, ymax = a.Jgetcompdim(comp) for y in range(ymax): for x in range(xmax): if a.Jgetblock(x, y, comp) != b.Jgetblock(x, y, comp): print("blocks({},{}) in component {} not match".format(y, x, comp)) diff = True cnt += 1 return diff, cnt