d642d837
Chunk
staged.
|
1
2
3
|
__author__ = 'chunk'
from ..mspark import SC
|
1821e0e3
Chunk
benchmarking...
|
4
5
6
|
from ..common import *
from ..mdata import ILSVRC, ILSVRC_S
|
d642d837
Chunk
staged.
|
7
|
from pyspark.mllib.regression import LabeledPoint
|
54e2adda
Chunk
staged.
|
8
9
|
import happybase
|
5c9c44da
Chunk
staged.
|
10
|
|
54e2adda
Chunk
staged.
|
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
def test_whole():
cols0 = [
'cf_pic:data',
'cf_info:width',
'cf_info:height',
'cf_info:size',
'cf_info:capacity',
'cf_info:quality',
'cf_info:rate',
'cf_tag:chosen',
'cf_tag:class'
]
cols1 = [
'cf_pic:data',
'cf_info:width',
'cf_info:height',
'cf_info:size',
'cf_info:capacity',
'cf_info:quality',
'cf_info:rate',
'cf_tag:chosen',
'cf_tag:class',
'cf_feat:bid',
]
|
84648488
Chunk
reverted.
|
36
|
sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
|
54e2adda
Chunk
staged.
|
37
38
39
|
# rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \
# .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
|
5c9c44da
Chunk
staged.
|
40
|
# .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
|
1821e0e3
Chunk
benchmarking...
|
41
|
# .mapValues(lambda items: SC.rddfeat_ILS(items))
|
54e2adda
Chunk
staged.
|
42
|
|
84648488
Chunk
reverted.
|
43
|
rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False).mapValues(
|
54e2adda
Chunk
staged.
|
44
45
46
47
48
49
50
51
|
lambda data: [data] + SC.rddinfo_ILS(data))
rdd_data_ext = rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=0.2)).filter(lambda x: x != None)
rdd_data = rdd_data.union(rdd_data_ext).mapValues(lambda items: SC.rddfeat_ILS(items))
print len(rdd_data.collect())
# sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1,
|
f4fb4381
Chunk
staged.
|
52
|
# withdata=True)
|
54e2adda
Chunk
staged.
|
53
54
|
|
1821e0e3
Chunk
benchmarking...
|
55
56
57
58
59
60
61
|
def test_whole_ext(category='Train_100'):
timer = Timer()
print '[time]category:', category
print '[time]formating table...'
timer.mark()
|
84648488
Chunk
reverted.
|
62
|
dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category=category)
|
1821e0e3
Chunk
benchmarking...
|
63
|
dil.delete_table()
|
f4fb4381
Chunk
staged.
|
64
|
# dil.format()
|
1821e0e3
Chunk
benchmarking...
|
65
66
67
68
69
70
|
dil.store_img()
timer.report()
print '[time]reading table...'
timer.mark()
table_name = dil.table_name
|
54e2adda
Chunk
staged.
|
71
72
73
|
connection = happybase.Connection('HPC-server')
tables = connection.tables()
if table_name not in tables:
|
f4fb4381
Chunk
staged.
|
74
75
76
77
78
79
80
81
82
|
families_compressed = {'cf_pic': dict(max_versions=2, compression='LZO'),
'cf_info': dict(max_versions=2, compression='LZO'),
'cf_tag': dict(max_versions=2, compression='LZO'),
'cf_feat': dict(max_versions=2, compression='LZO'),
}
families = {'cf_pic': dict(max_versions=1),
'cf_info': dict(max_versions=1),
'cf_tag': dict(max_versions=1),
'cf_feat': dict(max_versions=1),
|
54e2adda
Chunk
staged.
|
83
84
85
86
87
88
89
|
}
connection.create_table(name=table_name, families=families)
table = connection.table(name=table_name)
cols = ['cf_pic:data']
list_data = []
for key, data in table.scan(columns=cols):
|
5c9c44da
Chunk
staged.
|
90
91
|
data = data['cf_pic:data']
list_data.append((key, data))
|
1821e0e3
Chunk
benchmarking...
|
92
|
timer.report()
|
54e2adda
Chunk
staged.
|
93
|
|
1821e0e3
Chunk
benchmarking...
|
94
95
|
print '[time]processing...'
timer.mark()
|
84648488
Chunk
reverted.
|
96
|
sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
|
f4fb4381
Chunk
staged.
|
97
|
rdd_data = sparker.sc.parallelize(list_data, 30) \
|
5c9c44da
Chunk
staged.
|
98
|
.mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
|
51708346
Chunk
final experiments...
|
99
|
.flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
|
54e2adda
Chunk
staged.
|
100
|
.mapValues(lambda items: SC.rddfeat_ILS(items))
|
f4fb4381
Chunk
staged.
|
101
|
items = rdd_data.collect()
|
1821e0e3
Chunk
benchmarking...
|
102
|
timer.report()
|
54e2adda
Chunk
staged.
|
103
|
|
1821e0e3
Chunk
benchmarking...
|
104
105
|
print '[time]writing table...'
timer.mark()
|
5c9c44da
Chunk
staged.
|
106
107
|
try:
with table.batch(batch_size=5000) as b:
|
f4fb4381
Chunk
staged.
|
108
|
for item in items:
|
5c9c44da
Chunk
staged.
|
109
110
111
112
113
114
115
116
117
118
119
120
|
imgname, imginfo = item[0], item[1]
b.put(imgname,
{
'cf_pic:data': imginfo[0],
'cf_info:width': str(imginfo[1]),
'cf_info:height': str(imginfo[2]),
'cf_info:size': str(imginfo[3]),
'cf_info:capacity': str(imginfo[4]),
'cf_info:quality': str(imginfo[5]),
'cf_info:rate': str(imginfo[6]),
'cf_tag:chosen': str(imginfo[7]),
'cf_tag:class': str(imginfo[8]),
|
1821e0e3
Chunk
benchmarking...
|
121
|
'cf_feat:ibd': imginfo[9],
|
5c9c44da
Chunk
staged.
|
122
123
124
|
})
except ValueError:
raise
|
1821e0e3
Chunk
benchmarking...
|
125
|
timer.report()
|
84648488
Chunk
reverted.
|
126
127
|
|
f4fb4381
Chunk
staged.
|
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
def test_whole_sp(category='Train_100'):
timer = Timer()
print '[time]category:', category
print '[time]formating table...'
timer.mark()
dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category=category)
dil.delete_table()
# dil.format()
dil.store_img()
timer.report()
table_name = dil.table_name
feattype = 'ibd'
cols = [
'cf_pic:data',
'cf_info:width',
'cf_info:height',
'cf_info:size',
'cf_info:capacity',
'cf_info:quality',
'cf_info:rate',
'cf_tag:chosen',
'cf_tag:class',
'cf_feat:' + feattype,
]
print '[time]processing...'
timer.mark()
sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
rdd_data = sparker.read_hbase(table_name, func=SC.rddparse_data_ILS,
collect=False, parallelism=30) \
.mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
.flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
.mapValues(lambda items: SC.rddfeat_ILS(items, feattype))
print '[time]writing table...'
sparker.write_hbase(table_name, rdd_data, fromrdd=True, columns=cols,
withdata=True)
timer.report()
|
84648488
Chunk
reverted.
|
|
|