Commit 26616791c5ea340b07ab5fbeaf3d2922812d7d9f

Authored by Chunk
1 parent 13a594f1
Exists in master and in 1 other branch refactor

RDD-hbase bug fixed.(with 'repartition()')

Showing 2 changed files with 9 additions and 5 deletions   Show diff stats
mspark/SC.py
... ... @@ -247,9 +247,10 @@ def format_out(row, cols, withdata=False):
247 247 puts.append((key, [key] + col + [str(data)]))
248 248 return puts
249 249  
  250 +
250 251 # scconf = SparkConf()
251 252 # scconf.setSparkHome("HPC-server") \
252   -# .setMaster("spark://HPC-server:7077") \
  253 +# .setMaster("spark://HPC-server:7077") \
253 254 # .setAppName("example")
254 255 # sc = SparkContext(conf=scconf)
255 256 #
... ... @@ -342,7 +343,7 @@ class Sparker(object):
342 343 self.model = None
343 344  
344 345  
345   - def read_hbase(self, table_name, func=None, collect=False):
  346 + def read_hbase(self, table_name, func=None, collect=False, parallelism=40):
346 347 """
347 348 ref - http://happybase.readthedocs.org/en/latest/user.html#retrieving-data
348 349  
... ... @@ -372,7 +373,7 @@ class Sparker(object):
372 373 if collect:
373 374 return hbase_rdd.collect()
374 375 else:
375   - return hbase_rdd
  376 + return hbase_rdd.repartition(parallelism)
376 377  
377 378 def write_hbase(self, table_name, data, fromrdd=False, columns=None, withdata=False):
378 379 """
... ...
test/test_data.py
... ... @@ -93,7 +93,7 @@ def test_ILSVRC_S_LOCAL():
93 93 timer.report()
94 94  
95 95  
96   -def test_ILSVRC_S_SPARK(category='Train_200'):
  96 +def test_ILSVRC_S_SPARK(category='Train_1000'):
97 97 timer = Timer()
98 98  
99 99 timer.mark()
... ... @@ -102,7 +102,7 @@ def test_ILSVRC_S_SPARK(category='Train_200'):
102 102 dil.format()
103 103 dil.store_img()
104 104 timer.report()
105   - return
  105 + # return
106 106  
107 107 dils = ILSVRC_S.DataILSVRC_S(base='ILSVRC2013_DET_val', category=category)
108 108  
... ... @@ -110,6 +110,9 @@ def test_ILSVRC_S_SPARK(category='Train_200'):
110 110 dils._extract_data(mode='spark', writeback=False)
111 111 timer.report()
112 112  
  113 + # print dils.rdd_data.count() # pass
  114 + # return
  115 +
113 116 timer.mark()
114 117 dils._embed_data(mode='spark', rate=0.2, readforward=False, writeback=False)
115 118 timer.report()
... ...