Commit ea1eb31a0f395ca7810bb29b79184052b23dbdf8
1 parent
ad70caf6
Exists in
master
and in
1 other branch
spark is privileged... we are going to write a special data module to process spark&hbase data.
Showing
5 changed files
with
458 additions
and
11 deletions
Show diff stats
mdata/CV.py
... | ... | @@ -89,7 +89,7 @@ class DataCV(DataDumperBase): |
89 | 89 | 'cf_info': dict(max_versions=10), |
90 | 90 | 'cf_tag': dict(), |
91 | 91 | 'cf_feat': dict(), |
92 | - } | |
92 | + } | |
93 | 93 | self.connection.create_table(name=self.table_name, families=families) |
94 | 94 | |
95 | 95 | table = self.connection.table(name=self.table_name) |
... | ... | @@ -250,7 +250,7 @@ class DataCV(DataDumperBase): |
250 | 250 | if self.sparkcontex == None: |
251 | 251 | self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077') |
252 | 252 | |
253 | - result = self.sparkcontex.read_habase(self.table_name) # result = {key:[feat,tag],...} | |
253 | + result = self.sparkcontex.read_hbase(self.table_name, collect=True) # result = {key:[feat,tag],...} | |
254 | 254 | for feat, tag in result: |
255 | 255 | X.append(feat) |
256 | 256 | Y.append(tag) | ... | ... |
... | ... | @@ -0,0 +1,442 @@ |
1 | +__author__ = 'chunk' | |
2 | + | |
3 | +from . import * | |
4 | +from ..mfeat import HOG, IntraBlockDiff | |
5 | +from ..mspark import SC | |
6 | +from ..common import * | |
7 | + | |
8 | +import os, sys | |
9 | +from PIL import Image | |
10 | +from hashlib import md5 | |
11 | +import csv | |
12 | +import shutil | |
13 | +import json | |
14 | +import collections | |
15 | +import happybase | |
16 | + | |
17 | +from ..mjpeg import * | |
18 | +from ..msteg import * | |
19 | +from ..msteg.steganography import LSB, F3, F4, F5 | |
20 | + | |
21 | +import numpy as np | |
22 | +from numpy.random import randn | |
23 | +import pandas as pd | |
24 | +from scipy import stats | |
25 | + | |
26 | +from subprocess import Popen, PIPE, STDOUT | |
27 | + | |
28 | + | |
29 | +np.random.seed(sum(map(ord, "whoami"))) | |
30 | + | |
31 | +package_dir = os.path.dirname(os.path.abspath(__file__)) | |
32 | + | |
33 | + | |
34 | +class DataILSVRCS(DataDumperBase): | |
35 | + def __init__(self, base_dir='/media/chunk/Elements/D/data/ImageNet/img/ILSVRC2013_DET_val', category='Train'): | |
36 | + DataDumperBase.__init__(self, base_dir, category) | |
37 | + | |
38 | + self.base_dir = base_dir | |
39 | + self.category = category | |
40 | + self.data_dir = os.path.join(self.base_dir, self.category) | |
41 | + | |
42 | + self.dst_dir = os.path.join(self.base_dir, 'dst', self.category) | |
43 | + self.list_file = os.path.join(self.dst_dir, 'file-tag.tsv') | |
44 | + self.feat_dir = os.path.join(self.dst_dir, 'Feat') | |
45 | + self.img_dir = os.path.join(self.dst_dir, 'Img') | |
46 | + | |
47 | + self.dict_data = {} | |
48 | + | |
49 | + self.table_name = self.base_dir.strip('/').split('/')[-1] + '-' + self.category | |
50 | + self.sparkcontex = None | |
51 | + | |
52 | + def format(self): | |
53 | + self.extract() | |
54 | + | |
55 | + def _hash_copy(self, image): | |
56 | + if not image.endswith('jpg'): | |
57 | + img = Image.open(image) | |
58 | + img.save('../res/tmp.jpg', format='JPEG') | |
59 | + image = '../res/tmp.jpg' | |
60 | + | |
61 | + with open(image, 'rb') as f: | |
62 | + index = md5(f.read()).hexdigest() | |
63 | + | |
64 | + im = Jpeg(image, key=sample_key) | |
65 | + self.dict_data[index] = [im.image_width, im.image_height, im.image_width * im.image_height, im.getCapacity(), | |
66 | + im.getQuality()] | |
67 | + | |
68 | + # self.dict_data[index] = [im.image_width, im.image_height, os.path.getsize(image), im.getQuality()] | |
69 | + | |
70 | + # origion: | |
71 | + # dir = base_dir + 'Img/Train/' + index[:3] | |
72 | + dir = os.path.join(self.img_dir, index[:3]) | |
73 | + if not os.path.exists(dir): | |
74 | + os.makedirs(dir) | |
75 | + image_path = os.path.join(dir, index[3:] + '.jpg') | |
76 | + # print image_path | |
77 | + | |
78 | + if not os.path.exists(image_path): | |
79 | + shutil.copy(image, image_path) | |
80 | + else: | |
81 | + pass | |
82 | + | |
83 | + def get_feat(self, image, feattype='ibd', **kwargs): | |
84 | + size = kwargs.get('size', (48, 48)) | |
85 | + | |
86 | + if feattype == 'hog': | |
87 | + feater = HOG.FeatHOG(size=size) | |
88 | + elif feattype == 'ibd': | |
89 | + feater = IntraBlockDiff.FeatIntraBlockDiff() | |
90 | + else: | |
91 | + raise Exception("Unknown feature type!") | |
92 | + | |
93 | + desc = feater.feat(image) | |
94 | + | |
95 | + return desc | |
96 | + | |
97 | + | |
98 | + def extract_feat(self, feattype='ibd'): | |
99 | + if feattype == 'hog': | |
100 | + feater = HOG.FeatHOG(size=(48, 48)) | |
101 | + elif feattype == 'ibd': | |
102 | + feater = IntraBlockDiff.FeatIntraBlockDiff() | |
103 | + else: | |
104 | + raise Exception("Unknown feature type!") | |
105 | + | |
106 | + list_image = [] | |
107 | + with open(self.list_file, 'rb') as tsvfile: | |
108 | + tsvfile = csv.reader(tsvfile, delimiter='\t') | |
109 | + for line in tsvfile: | |
110 | + list_image.append(line[0]) | |
111 | + | |
112 | + dict_featbuf = {} | |
113 | + for imgname in list_image: | |
114 | + # if imgtag == 'True': | |
115 | + image = os.path.join(self.img_dir, imgname[:3], imgname[3:] + '.jpg') | |
116 | + desc = feater.feat(image) | |
117 | + dict_featbuf[imgname] = desc | |
118 | + | |
119 | + for imgname, desc in dict_featbuf.items(): | |
120 | + # print imgname, desc | |
121 | + dir = os.path.join(self.feat_dir, imgname[:3]) | |
122 | + if not os.path.exists(dir): | |
123 | + os.makedirs(dir) | |
124 | + featpath = os.path.join(dir, imgname[3:].split('.')[0] + '.' + feattype) | |
125 | + with open(featpath, 'wb') as featfile: | |
126 | + featfile.write(json.dumps(desc.tolist())) | |
127 | + | |
128 | + def _build_list(self, list_file=None): | |
129 | + if list_file == None: | |
130 | + list_file = self.list_file | |
131 | + assert list_file != None | |
132 | + | |
133 | + ordict_img = collections.OrderedDict(sorted(self.dict_data.items(), key=lambda d: d[0])) | |
134 | + | |
135 | + with open(list_file, 'w') as f: | |
136 | + tsvfile = csv.writer(f, delimiter='\t') | |
137 | + for key, value in ordict_img.items(): | |
138 | + tsvfile.writerow([key] + value) | |
139 | + | |
140 | + def _anaylis(self, list_file=None): | |
141 | + if list_file == None: | |
142 | + list_file = self.list_file | |
143 | + assert list_file != None | |
144 | + | |
145 | + df_ILS = pd.read_csv(list_file, names=['hash', 'width', 'height', 'size', 'capacity', 'quality'], sep='\t') | |
146 | + length = df_ILS.shape[0] | |
147 | + df_ILS = df_ILS.sort(['capacity', 'size', 'quality'], ascending=True) | |
148 | + rand_class = stats.bernoulli.rvs(0.8, size=length) | |
149 | + | |
150 | + df_ILS['rate'] = np.zeros(df_ILS.shape[0], np.float64) | |
151 | + df_ILS['chosen'] = rand_class | |
152 | + df_ILS['class'] = np.zeros(length, np.int32) | |
153 | + | |
154 | + df_ILS.to_csv(list_file, header=False, index=False, sep='\t') | |
155 | + | |
156 | + def extract(self): | |
157 | + for path, subdirs, files in os.walk(self.data_dir): | |
158 | + for name in files: | |
159 | + imagepath = os.path.join(path, name) | |
160 | + # print imagepath | |
161 | + try: | |
162 | + self._hash_copy(imagepath) | |
163 | + except: | |
164 | + pass | |
165 | + | |
166 | + self._build_list() | |
167 | + self._anaylis() | |
168 | + | |
169 | + | |
170 | + def _embed_outer(self): | |
171 | + self.dict_data = {} | |
172 | + dict_embedresult = {} | |
173 | + os.environ["CLASSPATH"] = os.path.join(package_dir, "../libs/F5/") | |
174 | + cmd = 'java Embed %s %s -e %s -p password -c "stegan by chunk " -q %d' | |
175 | + | |
176 | + df_ILS = pd.read_csv(self.list_file, | |
177 | + names=['hash', 'width', 'height', 'size', 'capacity', 'quality', 'chosen', 'class'], | |
178 | + sep='\t') | |
179 | + df_ILS_TARGET = df_ILS[df_ILS['chosen'] == 1] | |
180 | + | |
181 | + for hash, size, quality in zip(df_ILS_TARGET['hash'], df_ILS_TARGET['size'], df_ILS_TARGET['quality']): | |
182 | + path_img = os.path.join(self.img_dir, hash[:3], hash[3:] + '.jpg') | |
183 | + if path_img: | |
184 | + print path_img | |
185 | + p = Popen(cmd % (path_img, 'res/tmp.jpg', 'res/toembed', quality), shell=True, stdout=PIPE, | |
186 | + stderr=STDOUT) | |
187 | + dict_embedresult[hash] = [line.strip('\n') for line in p.stdout.readlines()] | |
188 | + try: | |
189 | + self._hash_copy('res/tmp.jpg') | |
190 | + except: | |
191 | + pass | |
192 | + | |
193 | + with open(self.list_file + '.embed.log', 'wb') as f: | |
194 | + tsvfile = csv.writer(f, delimiter='\t') | |
195 | + for key, value in dict_embedresult.items(): | |
196 | + tsvfile.writerow([key] + value) | |
197 | + | |
198 | + self._build_list(self.list_file + '.embed') | |
199 | + | |
200 | + # merge | |
201 | + df_ILS_EMBED = pd.read_csv(self.list_file + '.embed', names=['hash', 'width', 'height', 'size', 'quality'], | |
202 | + sep='\t') | |
203 | + length = df_ILS_EMBED.shape[0] | |
204 | + df_ILS_EMBED = df_ILS_EMBED.sort(['size', 'quality'], ascending=True) | |
205 | + df_ILS_EMBED['chosen'] = np.zeros(length, np.int32) | |
206 | + df_ILS_EMBED['class'] = np.ones(length, np.int32) | |
207 | + | |
208 | + df_ILS = df_ILS.append(df_ILS_EMBED, ignore_index=True) | |
209 | + df_ILS.to_csv(self.list_file, header=False, index=False, sep='\t') | |
210 | + | |
211 | + def _embed_inner(self, rate=None): | |
212 | + self.dict_data = {} | |
213 | + f5 = F5.F5(sample_key, 1) | |
214 | + tmp_img = os.path.join(package_dir, '../res/tmp.jpg') | |
215 | + df_ILS = pd.read_csv(self.list_file, | |
216 | + names=['hash', 'width', 'height', 'size', 'capacity', 'quality', 'rate', 'chosen', | |
217 | + 'class'], | |
218 | + sep='\t') | |
219 | + df_ILS_TARGET = df_ILS[df_ILS['chosen'] == 1] | |
220 | + | |
221 | + for hash, capacity in zip(df_ILS_TARGET['hash'], df_ILS_TARGET['capacity']): | |
222 | + path_img = os.path.join(self.img_dir, hash[:3], hash[3:] + '.jpg') | |
223 | + if path_img: | |
224 | + print path_img | |
225 | + if rate == None: | |
226 | + embed_rate = f5.embed_raw_data(path_img, os.path.join(package_dir, '../res/toembed'), tmp_img) | |
227 | + else: | |
228 | + assert (rate >= 0 and rate < 1) | |
229 | + # print capacity | |
230 | + hidden = np.random.bytes(int(capacity * rate) / 8) | |
231 | + embed_rate = f5.embed_raw_data(path_img, hidden, tmp_img, frommem=True) | |
232 | + try: | |
233 | + with open(tmp_img, 'rb') as f: | |
234 | + index = md5(f.read()).hexdigest() | |
235 | + im = Jpeg(tmp_img, key=sample_key) | |
236 | + self.dict_data[index] = [im.image_width, im.image_height, im.image_width * im.image_height, | |
237 | + im.getCapacity(), | |
238 | + im.getQuality(), embed_rate] | |
239 | + | |
240 | + dir = os.path.join(self.img_dir, index[:3]) | |
241 | + if not os.path.exists(dir): | |
242 | + os.makedirs(dir) | |
243 | + image_path = os.path.join(dir, index[3:] + '.jpg') | |
244 | + if not os.path.exists(image_path): | |
245 | + shutil.copy(tmp_img, image_path) | |
246 | + else: | |
247 | + pass | |
248 | + except: | |
249 | + pass | |
250 | + | |
251 | + self._build_list(self.list_file + '.embed') | |
252 | + | |
253 | + # merge | |
254 | + df_ILS_EMBED = pd.read_csv(self.list_file + '.embed', | |
255 | + names=['hash', 'width', 'height', 'size', 'capacity', 'quality', 'rate'], | |
256 | + sep='\t') | |
257 | + | |
258 | + df_ILS_EMBED = df_ILS_EMBED.sort(['rate', 'capacity', 'size', 'quality'], ascending=True) | |
259 | + df_ILS_EMBED['chosen'] = np.zeros(df_ILS_EMBED.shape[0], np.int32) | |
260 | + df_ILS_EMBED['class'] = np.ones(df_ILS_EMBED.shape[0], np.int32) | |
261 | + | |
262 | + # print df_ILS_EMBED.dtypes | |
263 | + # print df_ILS.dtypes | |
264 | + # Form the intersection of two Index objects. Sortedness of the result is not guaranteed | |
265 | + df_ILS = df_ILS.append(df_ILS_EMBED, ignore_index=True) | |
266 | + df_ILS.to_csv(self.list_file, header=False, index=False, sep='\t') | |
267 | + | |
268 | + def embed(self, rate=None): | |
269 | + self._embed_inner(rate) | |
270 | + | |
271 | + def get_table(self): | |
272 | + if self.table != None: | |
273 | + return self.table | |
274 | + | |
275 | + if self.connection is None: | |
276 | + c = happybase.Connection('HPC-server') | |
277 | + self.connection = c | |
278 | + | |
279 | + tables = self.connection.tables() | |
280 | + if self.table_name not in tables: | |
281 | + families = {'cf_pic': dict(), | |
282 | + 'cf_info': dict(max_versions=10), | |
283 | + 'cf_tag': dict(), | |
284 | + 'cf_feat': dict(), | |
285 | + } | |
286 | + self.connection.create_table(name=self.table_name, families=families) | |
287 | + | |
288 | + table = self.connection.table(name=self.table_name) | |
289 | + | |
290 | + self.table = table | |
291 | + | |
292 | + return table | |
293 | + | |
294 | + | |
295 | + def store_img(self): | |
296 | + if self.table == None: | |
297 | + self.table = self.get_table() | |
298 | + | |
299 | + dict_databuf = {} | |
300 | + | |
301 | + with open(self.list_file, 'rb') as tsvfile: | |
302 | + tsvfile = csv.reader(tsvfile, delimiter='\t') | |
303 | + for line in tsvfile: | |
304 | + path_img = os.path.join(self.img_dir, line[0][:3], line[0][3:] + '.jpg') | |
305 | + if path_img: | |
306 | + with open(path_img, 'rb') as fpic: | |
307 | + dict_databuf[line[0] + '.jpg'] = fpic.read() | |
308 | + | |
309 | + try: | |
310 | + with self.table.batch(batch_size=5000) as b: | |
311 | + for imgname, imgdata in dict_databuf.items(): | |
312 | + b.put(imgname, {'cf_pic:data': imgdata}) | |
313 | + except ValueError: | |
314 | + raise | |
315 | + | |
316 | + | |
317 | + def store_info(self, infotype='all'): | |
318 | + if self.table == None: | |
319 | + self.table = self.get_table() | |
320 | + | |
321 | + dict_infobuf = {} | |
322 | + | |
323 | + with open(self.list_file, 'rb') as tsvfile: | |
324 | + tsvfile = csv.reader(tsvfile, delimiter='\t') | |
325 | + for line in tsvfile: | |
326 | + dict_infobuf[line[0] + '.jpg'] = line[1:-2] | |
327 | + | |
328 | + if infotype == 'all': | |
329 | + try: | |
330 | + with self.table.batch(batch_size=5000) as b: | |
331 | + for imgname, imginfo in dict_infobuf.items(): | |
332 | + b.put(imgname, | |
333 | + {'cf_info:width': imginfo[0], 'cf_info:height': imginfo[1], 'cf_info:size': imginfo[2], | |
334 | + 'cf_info:capacity': imginfo[3], | |
335 | + 'cf_info:quality': imginfo[4]}) | |
336 | + except ValueError: | |
337 | + raise | |
338 | + else: | |
339 | + raise Exception("Unknown infotype!") | |
340 | + | |
341 | + | |
342 | + def store_tag(self, tagtype='all'): | |
343 | + if self.table == None: | |
344 | + self.table = self.get_table() | |
345 | + | |
346 | + dict_tagbuf = {} | |
347 | + | |
348 | + with open(self.list_file, 'rb') as tsvfile: | |
349 | + tsvfile = csv.reader(tsvfile, delimiter='\t') | |
350 | + for line in tsvfile: | |
351 | + dict_tagbuf[line[0] + '.jpg'] = line[-2:] | |
352 | + | |
353 | + if tagtype == 'all': | |
354 | + try: | |
355 | + with self.table.batch(batch_size=5000) as b: | |
356 | + for imgname, imgtag in dict_tagbuf.items(): | |
357 | + b.put(imgname, {'cf_tag:chosen': imgtag[0], 'cf_tag:class': imgtag[1]}) | |
358 | + except ValueError: | |
359 | + raise | |
360 | + else: | |
361 | + raise Exception("Unknown tagtype!") | |
362 | + | |
363 | + | |
364 | + def store_feat(self, feattype='ibd'): | |
365 | + if self.table == None: | |
366 | + self.table = self.get_table() | |
367 | + | |
368 | + dict_featbuf = {} | |
369 | + for path, subdirs, files in os.walk(self.feat_dir): | |
370 | + for name in files: | |
371 | + featpath = os.path.join(path, name) | |
372 | + # print featpath | |
373 | + with open(featpath, 'rb') as featfile: | |
374 | + imgname = path.split('/')[-1] + name.replace('.' + feattype, '.jpg') | |
375 | + dict_featbuf[imgname] = featfile.read() | |
376 | + | |
377 | + try: | |
378 | + with self.table.batch(batch_size=5000) as b: | |
379 | + for imgname, featdesc in dict_featbuf.items(): | |
380 | + b.put(imgname, {'cf_feat:' + feattype: featdesc}) | |
381 | + except ValueError: | |
382 | + raise | |
383 | + pass | |
384 | + | |
385 | + | |
386 | + def load_data(self, mode='local', feattype='ibd', tagtype='class'): | |
387 | + INDEX = [] | |
388 | + X = [] | |
389 | + Y = [] | |
390 | + | |
391 | + if mode == "local": | |
392 | + | |
393 | + dict_dataset = {} | |
394 | + | |
395 | + with open(self.list_file, 'rb') as tsvfile: | |
396 | + tsvfile = csv.reader(tsvfile, delimiter='\t') | |
397 | + for line in tsvfile: | |
398 | + hash = line[0] | |
399 | + tag = line[-1] | |
400 | + path_feat = os.path.join(self.feat_dir, hash[:3], hash[3:] + '.' + feattype) | |
401 | + if path_feat: | |
402 | + with open(path_feat, 'rb') as featfile: | |
403 | + dict_dataset[hash] = (tag, json.loads(featfile.read())) | |
404 | + | |
405 | + for tag, feat in dict_dataset.values(): | |
406 | + X.append([item for sublist in feat for subsublist in sublist for item in subsublist]) | |
407 | + Y.append(int(tag)) | |
408 | + | |
409 | + elif mode == "remote" or mode == "hbase": | |
410 | + if self.table == None: | |
411 | + self.table = self.get_table() | |
412 | + | |
413 | + col_feat, col_tag = 'cf_feat:' + feattype, 'cf_tag:' + tagtype | |
414 | + for key, data in self.table.scan(columns=[col_feat, col_tag]): | |
415 | + X.append(json.loads(data[col_feat])) | |
416 | + Y.append(1 if data[col_tag] == 'True' else 0) | |
417 | + | |
418 | + elif mode == "spark" or mode == "cluster": | |
419 | + if self.sparkcontex == None: | |
420 | + self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077') | |
421 | + | |
422 | + result = self.sparkcontex.read_hbase(self.table_name) # result = {key:[feat,tag],...} | |
423 | + for feat, tag in result: | |
424 | + X.append(feat) | |
425 | + Y.append(tag) | |
426 | + | |
427 | + else: | |
428 | + raise Exception("Unknown mode!") | |
429 | + | |
430 | + return X, Y | |
431 | + | |
432 | + | |
433 | + | |
434 | + | |
435 | + | |
436 | + | |
437 | + | |
438 | + | |
439 | + | |
440 | + | |
441 | + | |
442 | + | ... | ... |
mdata/ILSVRC.py
... | ... | @@ -419,7 +419,7 @@ class DataILSVRC(DataDumperBase): |
419 | 419 | if self.sparkcontex == None: |
420 | 420 | self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077') |
421 | 421 | |
422 | - result = self.sparkcontex.read_habase(self.table_name) # result = {key:[feat,tag],...} | |
422 | + result = self.sparkcontex.read_hbase(self.table_name) # result = {key:[feat,tag],...} | |
423 | 423 | for feat, tag in result: |
424 | 424 | X.append(feat) |
425 | 425 | Y.append(tag) | ... | ... |
mdata/MSR.py
... | ... | @@ -260,7 +260,7 @@ class DataMSR(DataDumperBase): |
260 | 260 | if self.sparkcontex == None: |
261 | 261 | self.sparkcontex = SC.Sparker(host='HPC-server', appname='ImageCV', master='spark://HPC-server:7077') |
262 | 262 | |
263 | - result = self.sparkcontex.read_habase(self.table_name) # result = {key:[feat,tag],...} | |
263 | + result = self.sparkcontex.read_hbase(self.table_name) # result = {key:[feat,tag],...} | |
264 | 264 | for key, data in result.items(): |
265 | 265 | X.append(data[0]) |
266 | 266 | Y.append(data[1]) | ... | ... |
mspark/SC.py
... | ... | @@ -47,7 +47,7 @@ class Sparker(object): |
47 | 47 | |
48 | 48 | self.model = None |
49 | 49 | |
50 | - def read_habase(self, table_name, columns=None): | |
50 | + def read_hbase(self, table_name, func=None, collect=False): | |
51 | 51 | """ |
52 | 52 | ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data |
53 | 53 | |
... | ... | @@ -59,7 +59,7 @@ class Sparker(object): |
59 | 59 | """ |
60 | 60 | hconf = {"hbase.zookeeper.quorum": self.host, |
61 | 61 | "hbase.mapreduce.inputtable": table_name, |
62 | - } | |
62 | + } | |
63 | 63 | |
64 | 64 | hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], |
65 | 65 | keyClass=hparams["readKeyClass"], |
... | ... | @@ -67,11 +67,16 @@ class Sparker(object): |
67 | 67 | keyConverter=hparams["readKeyConverter"], |
68 | 68 | valueConverter=hparams["readValueConverter"], |
69 | 69 | conf=hconf) |
70 | - hbase_rdd = hbase_rdd.map(lambda x: parse_cv(x)) | |
71 | - output = hbase_rdd.collect() | |
72 | - return output | |
73 | 70 | |
74 | - def write_habase(self, table_name, data): | |
71 | + parser = func if func != None else parse_cv | |
72 | + hbase_rdd = hbase_rdd.map(lambda x: parser(x)) | |
73 | + | |
74 | + if collect: | |
75 | + return hbase_rdd.collect() | |
76 | + else: | |
77 | + return hbase_rdd | |
78 | + | |
79 | + def write_hbase(self, table_name, data): | |
75 | 80 | """ |
76 | 81 | Data Format: |
77 | 82 | e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]] |
... | ... | @@ -82,7 +87,7 @@ class Sparker(object): |
82 | 87 | "mapreduce.outputformat.class": hparams["outputFormatClass"], |
83 | 88 | "mapreduce.job.output.key.class": hparams["writeKeyClass"], |
84 | 89 | "mapreduce.job.output.value.class": hparams["writeValueClass"], |
85 | - } | |
90 | + } | |
86 | 91 | |
87 | 92 | self.sc.parallelize(data).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( |
88 | 93 | conf=hconf, | ... | ... |