Commit 54e2adda230421077d556c3d775d9f399b82652e

Authored by Chunk
1 parent d642d837
Exists in master and in 1 other branch refactor

staged.

mspark/SC.py
... ... @@ -247,6 +247,77 @@ def format_out(row, cols, withdata=False):
247 247 puts.append((key, [key] + col + [str(data)]))
248 248 return puts
249 249  
  250 +# scconf = SparkConf()
  251 +# scconf.setSparkHome("HPC-server") \
  252 +# .setMaster("spark://HPC-server:7077") \
  253 +# .setAppName("example")
  254 +# sc = SparkContext(conf=scconf)
  255 +#
  256 +#
  257 +# def read_hbase(table_name, func=None, collect=False):
  258 +# """
  259 +# ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
  260 +#
  261 +# Filter format:
  262 +# columns=['cf1:col1', 'cf1:col2']
  263 +# or
  264 +# columns=['cf1']
  265 +#
  266 +# """
  267 +#
  268 +# hconf = {
  269 +# "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
  270 +# # "hbase.zookeeper.quorum": self.host,
  271 +# "hbase.mapreduce.inputtable": table_name,
  272 +# }
  273 +#
  274 +# hbase_rdd = sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
  275 +# keyClass=hparams["readKeyClass"],
  276 +# valueClass=hparams["readValueClass"],
  277 +# keyConverter=hparams["readKeyConverter"],
  278 +# valueConverter=hparams["readValueConverter"],
  279 +# conf=hconf)
  280 +#
  281 +# parser = func if func != None else rddparse_data_CV
  282 +# hbase_rdd = hbase_rdd.map(lambda x: parser(x))
  283 +#
  284 +# if collect:
  285 +# return hbase_rdd.collect()
  286 +# else:
  287 +# return hbase_rdd
  288 +#
  289 +#
  290 +# def write_hbase(table_name, data, fromrdd=False, columns=None, withdata=False):
  291 +# """
  292 +# Data Format: (Deprecated)
  293 +# e.g. [["row8", "f1", "", "caocao cao"], ["row9", "f1", "c1", "asdfg hhhh"]]
  294 +#
  295 +# Data(from dictionary):
  296 +# e.g. data ={'row1':[1,3400,'hello'], 'row2':[34,5000,'here in mine']},
  297 +# cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
  298 +# Data(from Rdd):
  299 +# e.g. data =[('row1',[1,3400,'hello']), ('row2',[34,5000,'here in mine'])],
  300 +# cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
  301 +# """
  302 +# hconf = {
  303 +# "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", # "hbase.zookeeper.quorum": self.host,
  304 +# "hbase.mapreduce.inputtable": table_name,
  305 +# "hbase.mapred.outputtable": table_name,
  306 +# "mapreduce.outputformat.class": hparams["outputFormatClass"],
  307 +# "mapreduce.job.output.key.class": hparams["writeKeyClass"],
  308 +# "mapreduce.job.output.value.class": hparams["writeValueClass"],
  309 +# }
  310 +# cols = [col.split(':') for col in columns]
  311 +# if not fromrdd:
  312 +# rdd_data = sc.parallelize(data)
  313 +# else:
  314 +# rdd_data = data
  315 +#
  316 +# rdd_data.flatMap(lambda x: format_out(x, cols, withdata=withdata)).saveAsNewAPIHadoopDataset(
  317 +# conf=hconf,
  318 +# keyConverter=hparams["writeKeyConverter"],
  319 +# valueConverter=hparams["writeValueConverter"])
  320 +
250 321  
251 322 class Sparker(object):
252 323 def __init__(self, host='HPC-server', appname='NewPySparkApp', **kwargs):
... ... @@ -283,9 +354,9 @@ class Sparker(object):
283 354 """
284 355  
285 356 hconf = {
286   - "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
287   - # "hbase.zookeeper.quorum": self.host,
288   - "hbase.mapreduce.inputtable": table_name,
  357 + "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2",
  358 + # "hbase.zookeeper.quorum": self.host,
  359 + "hbase.mapreduce.inputtable": table_name,
289 360 }
290 361  
291 362 hbase_rdd = self.sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"],
... ... @@ -316,12 +387,12 @@ class Sparker(object):
316 387 cols = ['cf_info:id', 'cf_info:size', 'cf_tag:desc']
317 388 """
318 389 hconf = {
319   - "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", # "hbase.zookeeper.quorum": self.host,
320   - "hbase.mapreduce.inputtable": table_name,
321   - "hbase.mapred.outputtable": table_name,
322   - "mapreduce.outputformat.class": hparams["outputFormatClass"],
323   - "mapreduce.job.output.key.class": hparams["writeKeyClass"],
324   - "mapreduce.job.output.value.class": hparams["writeValueClass"],
  390 + "hbase.zookeeper.quorum": "HPC-server, HPC, HPC2", # "hbase.zookeeper.quorum": self.host,
  391 + "hbase.mapreduce.inputtable": table_name,
  392 + "hbase.mapred.outputtable": table_name,
  393 + "mapreduce.outputformat.class": hparams["outputFormatClass"],
  394 + "mapreduce.job.output.key.class": hparams["writeKeyClass"],
  395 + "mapreduce.job.output.value.class": hparams["writeValueClass"],
325 396 }
326 397 cols = [col.split(':') for col in columns]
327 398 if not fromrdd:
... ...
test/test_data.py
... ... @@ -93,13 +93,13 @@ def test_ILSVRC_S_LOCAL():
93 93 def test_ILSVRC_S_SPARK():
94 94 timer = Timer()
95 95  
96   - # timer.mark()
97   - # dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Test_1')
98   - # dil.delete_table()
99   - # dil.format()
100   - # dil.store_img()
101   - # timer.report()
102   - # return
  96 + timer.mark()
  97 + dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category='Test_1')
  98 + dil.delete_table()
  99 + dil.format()
  100 + dil.store_img()
  101 + timer.report()
  102 + return
103 103  
104 104 dils = ILSVRC_S.DataILSVRC_S(base='ILSVRC2013_DET_val', category='Test_1')
105 105  
... ...
test/test_whole.py
... ... @@ -2,48 +2,98 @@ __author__ = 'chunk'
2 2  
3 3 from ..mspark import SC
4 4 from pyspark.mllib.regression import LabeledPoint
5   -
6   -
7   -cols0 = [
8   - 'cf_pic:data',
9   - 'cf_info:width',
10   - 'cf_info:height',
11   - 'cf_info:size',
12   - 'cf_info:capacity',
13   - 'cf_info:quality',
14   - 'cf_info:rate',
15   - 'cf_tag:chosen',
16   - 'cf_tag:class'
17   -]
18   -cols1 = [
19   - 'cf_pic:data',
20   - 'cf_info:width',
21   - 'cf_info:height',
22   - 'cf_info:size',
23   - 'cf_info:capacity',
24   - 'cf_info:quality',
25   - 'cf_info:rate',
26   - 'cf_tag:chosen',
27   - 'cf_tag:class',
28   - 'cf_feat:bid',
29   -]
30   -
31   -sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
32   -
33   -rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \
34   - .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
35   - .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
36   - .mapValues(lambda items: SC.rddfeat_ILS(items))
37   -
38   -sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1,
39   - withdata=True)
40   -
41   -
42   -
43   -
44   -
45   -
46   -
  5 +import happybase
  6 +
  7 +def test_whole():
  8 + cols0 = [
  9 + 'cf_pic:data',
  10 + 'cf_info:width',
  11 + 'cf_info:height',
  12 + 'cf_info:size',
  13 + 'cf_info:capacity',
  14 + 'cf_info:quality',
  15 + 'cf_info:rate',
  16 + 'cf_tag:chosen',
  17 + 'cf_tag:class'
  18 + ]
  19 + cols1 = [
  20 + 'cf_pic:data',
  21 + 'cf_info:width',
  22 + 'cf_info:height',
  23 + 'cf_info:size',
  24 + 'cf_info:capacity',
  25 + 'cf_info:quality',
  26 + 'cf_info:rate',
  27 + 'cf_tag:chosen',
  28 + 'cf_tag:class',
  29 + 'cf_feat:bid',
  30 + ]
  31 +
  32 + sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
  33 +
  34 + # rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \
  35 + # .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
  36 + # .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
  37 + # .mapValues(lambda items: SC.rddfeat_ILS(items))
  38 +
  39 + rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False).mapValues(
  40 + lambda data: [data] + SC.rddinfo_ILS(data))
  41 + rdd_data_ext = rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=0.2)).filter(lambda x: x != None)
  42 +
  43 + rdd_data = rdd_data.union(rdd_data_ext).mapValues(lambda items: SC.rddfeat_ILS(items))
  44 +
  45 + print len(rdd_data.collect())
  46 +
  47 + # sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1,
  48 + # withdata=True)
  49 +
  50 +
  51 +def test_whole_ext():
  52 + table_name = "ILSVRC2013_DET_val-Test_1"
  53 + connection = happybase.Connection('HPC-server')
  54 + tables = connection.tables()
  55 + if table_name not in tables:
  56 + families = {'cf_pic': dict(),
  57 + 'cf_info': dict(max_versions=10),
  58 + 'cf_tag': dict(),
  59 + 'cf_feat': dict(),
  60 + }
  61 + connection.create_table(name=table_name, families=families)
  62 + table = connection.table(name=table_name)
  63 +
  64 + cols = ['cf_pic:data']
  65 + list_data = []
  66 + for key, data in table.scan(columns=cols):
  67 + data = data['cf_pic:data']
  68 + list_data.append((key,data))
  69 +
  70 + sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
  71 + rdd_data = sparker.sc.parallelize(list_data,20)\
  72 + .mapValues(lambda data: [data] + SC.rddinfo_ILS(data))\
  73 + .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2))\
  74 + .mapValues(lambda items: SC.rddfeat_ILS(items))
  75 +
  76 + rrr = rdd_data.collect()
  77 + print "-----------------",len(rrr),"===================="
  78 + print "+++++++++++++++++",rrr[0],"**********************"
  79 + # try:
  80 + # with table.batch(batch_size=5000) as b:
  81 + # for imgname, imginfo in rdd_data.collect().items():
  82 + # b.put(imgname,
  83 + # {
  84 + # 'cf_pic:data': imginfo[0],
  85 + # 'cf_info:width': str(imginfo[1]),
  86 + # 'cf_info:height': str(imginfo[2]),
  87 + # 'cf_info:size': str(imginfo[3]),
  88 + # 'cf_info:capacity': str(imginfo[4]),
  89 + # 'cf_info:quality': str(imginfo[5]),
  90 + # 'cf_info:rate': str(imginfo[6]),
  91 + # 'cf_tag:chosen': str(imginfo[7]),
  92 + # 'cf_tag:class': str(imginfo[8]),
  93 + # 'cf_feat:' + feattype: imginfo[9],
  94 + # })
  95 + # except ValueError:
  96 + # raise
47 97  
48 98  
49 99  
... ...