Blame view

test/test_whole.py 5.62 KB
d642d837   Chunk   staged.
1
2
3
__author__ = 'chunk'

from ..mspark import SC
1821e0e3   Chunk   benchmarking...
4
5
6
from ..common import *
from ..mdata import ILSVRC, ILSVRC_S

d642d837   Chunk   staged.
7
from pyspark.mllib.regression import LabeledPoint
54e2adda   Chunk   staged.
8
9
import happybase

5c9c44da   Chunk   staged.
10

54e2adda   Chunk   staged.
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def test_whole():
    cols0 = [
        'cf_pic:data',
        'cf_info:width',
        'cf_info:height',
        'cf_info:size',
        'cf_info:capacity',
        'cf_info:quality',
        'cf_info:rate',
        'cf_tag:chosen',
        'cf_tag:class'
    ]
    cols1 = [
        'cf_pic:data',
        'cf_info:width',
        'cf_info:height',
        'cf_info:size',
        'cf_info:capacity',
        'cf_info:quality',
        'cf_info:rate',
        'cf_tag:chosen',
        'cf_tag:class',
        'cf_feat:bid',
    ]

84648488   Chunk   reverted.
36
    sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
54e2adda   Chunk   staged.
37
38
39

    # rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \
    # .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
5c9c44da   Chunk   staged.
40
    # .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
1821e0e3   Chunk   benchmarking...
41
    # .mapValues(lambda items: SC.rddfeat_ILS(items))
54e2adda   Chunk   staged.
42

84648488   Chunk   reverted.
43
    rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False).mapValues(
54e2adda   Chunk   staged.
44
45
46
47
48
49
50
51
        lambda data: [data] + SC.rddinfo_ILS(data))
    rdd_data_ext = rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=0.2)).filter(lambda x: x != None)

    rdd_data = rdd_data.union(rdd_data_ext).mapValues(lambda items: SC.rddfeat_ILS(items))

    print len(rdd_data.collect())

    # sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1,
f4fb4381   Chunk   staged.
52
    # withdata=True)
54e2adda   Chunk   staged.
53
54


1821e0e3   Chunk   benchmarking...
55
56
57
58
59
60
61
def test_whole_ext(category='Train_100'):
    timer = Timer()

    print '[time]category:', category

    print '[time]formating table...'
    timer.mark()
84648488   Chunk   reverted.
62
    dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category=category)
1821e0e3   Chunk   benchmarking...
63
    dil.delete_table()
f4fb4381   Chunk   staged.
64
    # dil.format()
1821e0e3   Chunk   benchmarking...
65
66
67
68
69
70
    dil.store_img()
    timer.report()

    print '[time]reading table...'
    timer.mark()
    table_name = dil.table_name
54e2adda   Chunk   staged.
71
72
73
    connection = happybase.Connection('HPC-server')
    tables = connection.tables()
    if table_name not in tables:
f4fb4381   Chunk   staged.
74
75
76
77
78
79
80
81
82
        families_compressed = {'cf_pic': dict(max_versions=2, compression='LZO'),
                               'cf_info': dict(max_versions=2, compression='LZO'),
                               'cf_tag': dict(max_versions=2, compression='LZO'),
                               'cf_feat': dict(max_versions=2, compression='LZO'),
                               }
        families = {'cf_pic': dict(max_versions=1),
                    'cf_info': dict(max_versions=1),
                    'cf_tag': dict(max_versions=1),
                    'cf_feat': dict(max_versions=1),
54e2adda   Chunk   staged.
83
84
85
86
87
88
89
                    }
        connection.create_table(name=table_name, families=families)
    table = connection.table(name=table_name)

    cols = ['cf_pic:data']
    list_data = []
    for key, data in table.scan(columns=cols):
5c9c44da   Chunk   staged.
90
91
        data = data['cf_pic:data']
        list_data.append((key, data))
1821e0e3   Chunk   benchmarking...
92
    timer.report()
54e2adda   Chunk   staged.
93

1821e0e3   Chunk   benchmarking...
94
95
    print '[time]processing...'
    timer.mark()
84648488   Chunk   reverted.
96
    sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
f4fb4381   Chunk   staged.
97
    rdd_data = sparker.sc.parallelize(list_data, 30) \
5c9c44da   Chunk   staged.
98
        .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
51708346   Chunk   final experiments...
99
        .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.05)) \
54e2adda   Chunk   staged.
100
        .mapValues(lambda items: SC.rddfeat_ILS(items))
f4fb4381   Chunk   staged.
101
    items = rdd_data.collect()
1821e0e3   Chunk   benchmarking...
102
    timer.report()
54e2adda   Chunk   staged.
103

1821e0e3   Chunk   benchmarking...
104
105
    print '[time]writing table...'
    timer.mark()
5c9c44da   Chunk   staged.
106
107
    try:
        with table.batch(batch_size=5000) as b:
f4fb4381   Chunk   staged.
108
            for item in items:
5c9c44da   Chunk   staged.
109
110
111
112
113
114
115
116
117
118
119
120
                imgname, imginfo = item[0], item[1]
                b.put(imgname,
                      {
                          'cf_pic:data': imginfo[0],
                          'cf_info:width': str(imginfo[1]),
                          'cf_info:height': str(imginfo[2]),
                          'cf_info:size': str(imginfo[3]),
                          'cf_info:capacity': str(imginfo[4]),
                          'cf_info:quality': str(imginfo[5]),
                          'cf_info:rate': str(imginfo[6]),
                          'cf_tag:chosen': str(imginfo[7]),
                          'cf_tag:class': str(imginfo[8]),
1821e0e3   Chunk   benchmarking...
121
                          'cf_feat:ibd': imginfo[9],
5c9c44da   Chunk   staged.
122
123
124
                      })
    except ValueError:
        raise
1821e0e3   Chunk   benchmarking...
125
    timer.report()
84648488   Chunk   reverted.
126
127


f4fb4381   Chunk   staged.
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def test_whole_sp(category='Train_100'):
    timer = Timer()

    print '[time]category:', category

    print '[time]formating table...'
    timer.mark()
    dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category=category)
    dil.delete_table()
    # dil.format()
    dil.store_img()
    timer.report()

    table_name = dil.table_name
    feattype = 'ibd'
    cols = [
        'cf_pic:data',
        'cf_info:width',
        'cf_info:height',
        'cf_info:size',
        'cf_info:capacity',
        'cf_info:quality',
        'cf_info:rate',
        'cf_tag:chosen',
        'cf_tag:class',
        'cf_feat:' + feattype,
    ]

    print '[time]processing...'
    timer.mark()
    sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
    rdd_data = sparker.read_hbase(table_name, func=SC.rddparse_data_ILS,
                                  collect=False, parallelism=30) \
        .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
        .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
        .mapValues(lambda items: SC.rddfeat_ILS(items, feattype))

    print '[time]writing table...'
    sparker.write_hbase(table_name, rdd_data, fromrdd=True, columns=cols,
                        withdata=True)
    timer.report()
84648488   Chunk   reverted.