Commit d642d837fa065c133187e9c8f8dbc0283103bfe6

Authored by Chunk
1 parent 489c5608
Exists in master and in 1 other branch refactor

staged.

Showing 2 changed files with 69 additions and 13 deletions   Show diff stats
@@ -155,6 +155,7 @@ def rddembed_ILS(row, rate=None): @@ -155,6 +155,7 @@ def rddembed_ILS(row, rate=None):
155 tmpf_src.close() 155 tmpf_src.close()
156 tmpf_dst.close() 156 tmpf_dst.close()
157 157
  158 +
158 def rddembed_ILS_EXT(row, rate=None): 159 def rddembed_ILS_EXT(row, rate=None):
159 """ 160 """
160 input: 161 input:
@@ -188,7 +189,7 @@ def rddembed_ILS_EXT(row, rate=None): @@ -188,7 +189,7 @@ def rddembed_ILS_EXT(row, rate=None):
188 raw = tmpf_dst.read() 189 raw = tmpf_dst.read()
189 index = md5(raw).hexdigest() 190 index = md5(raw).hexdigest()
190 191
191 - return [row,(index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))] 192 + return [row, (index + '.jpg', [raw] + rddinfo_ILS(raw, embed_rate, 0, 1))]
192 193
193 except Exception as e: 194 except Exception as e:
194 print e 195 print e
@@ -282,10 +283,10 @@ class Sparker(object): @@ -282,10 +283,10 @@ class Sparker(object):
282 """ 283 """
283 284
284 hconf = { 285 hconf = {
285 - "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",  
286 - #"hbase.zookeeper.quorum": self.host,  
287 - "hbase.mapreduce.inputtable": table_name,  
288 - } 286 + "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
  287 + # "hbase.zookeeper.quorum": self.host,
  288 + "hbase.mapreduce.inputtable": table_name,
  289 + }
289 290
290 hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], 291 hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
291 keyClass=hparams["readKeyClass"], 292 keyClass=hparams["readKeyClass"],
@@ -315,14 +316,13 @@ class Sparker(object): @@ -315,14 +316,13 @@ class Sparker(object):
315 cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc'] 316 cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
316 """ 317 """
317 hconf = { 318 hconf = {
318 - "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",  
319 - #"hbase.zookeeper.quorum": self.host,  
320 - "hbase.mapreduce.inputtable": table_name,  
321 - "hbase.mapred.outputtable": table_name,  
322 - "mapreduce.outputformat.class": hparams["outputFormatClass"],  
323 - "mapreduce.job.output.key.class": hparams["writeKeyClass"],  
324 - "mapreduce.job.output.value.class": hparams["writeValueClass"],  
325 - } 319 + "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", # "hbase.zookeeper.quorum": self.host,
  320 + "hbase.mapreduce.inputtable": table_name,
  321 + "hbase.mapred.outputtable": table_name,
  322 + "mapreduce.outputformat.class": hparams["outputFormatClass"],
  323 + "mapreduce.job.output.key.class": hparams["writeKeyClass"],
  324 + "mapreduce.job.output.value.class": hparams["writeValueClass"],
  325 + }
326 cols = [col.split(':') for col in columns] 326 cols = [col.split(':') for col in columns]
327 if not fromrdd: 327 if not fromrdd:
328 rdd_data = self.sc.parallelize(data) 328 rdd_data = self.sc.parallelize(data)
test/test_whole.py 0 → 100644
@@ -0,0 +1,56 @@ @@ -0,0 +1,56 @@
  1 +__author__ = 'chunk'
  2 +
  3 +from ..mspark import SC
  4 +from pyspark.mllib.regression import LabeledPoint
  5 +
  6 +
  7 +cols0 = [
  8 + 'cf_pic:data',
  9 + 'cf_info:width',
  10 + 'cf_info:height',
  11 + 'cf_info:size',
  12 + 'cf_info:capacity',
  13 + 'cf_info:quality',
  14 + 'cf_info:rate',
  15 + 'cf_tag:chosen',
  16 + 'cf_tag:class'
  17 +]
  18 +cols1 = [
  19 + 'cf_pic:data',
  20 + 'cf_info:width',
  21 + 'cf_info:height',
  22 + 'cf_info:size',
  23 + 'cf_info:capacity',
  24 + 'cf_info:quality',
  25 + 'cf_info:rate',
  26 + 'cf_tag:chosen',
  27 + 'cf_tag:class',
  28 + 'cf_feat:bid',
  29 +]
  30 +
  31 +sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
  32 +
  33 +rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \
  34 + .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
  35 + .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
  36 + .mapValues(lambda items: SC.rddfeat_ILS(items))
  37 +
  38 +sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1,
  39 + withdata=True)
  40 +
  41 +
  42 +
  43 +
  44 +
  45 +
  46 +
  47 +
  48 +
  49 +
  50 +
  51 +
  52 +
  53 +
  54 +
  55 +
  56 +