Blame view

test/test_whole.py 4.03 KB
d642d837   Chunk   staged.
1
2
3
__author__ = 'chunk'

from ..mspark import SC
1821e0e3   Chunk   benchmarking...
4
5
6
from ..common import *
from ..mdata import ILSVRC, ILSVRC_S

d642d837   Chunk   staged.
7
from pyspark.mllib.regression import LabeledPoint
54e2adda   Chunk   staged.
8
9
import happybase

5c9c44da   Chunk   staged.
10

54e2adda   Chunk   staged.
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def test_whole():
    cols0 = [
        'cf_pic:data',
        'cf_info:width',
        'cf_info:height',
        'cf_info:size',
        'cf_info:capacity',
        'cf_info:quality',
        'cf_info:rate',
        'cf_tag:chosen',
        'cf_tag:class'
    ]
    cols1 = [
        'cf_pic:data',
        'cf_info:width',
        'cf_info:height',
        'cf_info:size',
        'cf_info:capacity',
        'cf_info:quality',
        'cf_info:rate',
        'cf_tag:chosen',
        'cf_tag:class',
        'cf_feat:bid',
    ]

84648488   Chunk   reverted.
36
    sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S',
54e2adda   Chunk   staged.
37
38
39
                         master='spark://HPC-server:7077')

    # rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \
5c9c44da   Chunk   staged.
40
    # .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
1821e0e3   Chunk   benchmarking...
41
    # .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
54e2adda   Chunk   staged.
42
    # .mapValues(lambda items: SC.rddfeat_ILS(items))
84648488   Chunk   reverted.
43

54e2adda   Chunk   staged.
44
45
46
47
48
49
50
51
    rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS,
                                  collect=False).mapValues(
        lambda data: [data] + SC.rddinfo_ILS(data))
    rdd_data_ext = rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=0.2)).filter(lambda x: x != None)

    rdd_data = rdd_data.union(rdd_data_ext).mapValues(lambda items: SC.rddfeat_ILS(items))

    print len(rdd_data.collect())
f4fb4381   Chunk   staged.
52

54e2adda   Chunk   staged.
53
54
    # sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1,
    #                     withdata=True)
1821e0e3   Chunk   benchmarking...
55
56
57
58
59
60
61


def test_whole_ext(category='Train_100'):
    timer = Timer()

    print '[time]category:', category

84648488   Chunk   reverted.
62
    print '[time]formating table...'
1821e0e3   Chunk   benchmarking...
63
    timer.mark()
f4fb4381   Chunk   staged.
64
    dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val',
1821e0e3   Chunk   benchmarking...
65
66
67
68
69
70
                            category=category)
    dil.delete_table()
    dil.format()
    dil.store_img()
    timer.report()

54e2adda   Chunk   staged.
71
72
73
    print '[time]reading table...'
    timer.mark()
    table_name = dil.table_name
f4fb4381   Chunk   staged.
74
75
76
77
78
79
80
81
82
    connection = happybase.Connection('HPC-server')
    tables = connection.tables()
    if table_name not in tables:
        families = {'cf_pic': dict(),
                    'cf_info': dict(max_versions=10),
                    'cf_tag': dict(),
                    'cf_feat': dict(),
                    }
        connection.create_table(name=table_name, families=families)
54e2adda   Chunk   staged.
83
84
85
86
87
88
89
    table = connection.table(name=table_name)

    cols = ['cf_pic:data']
    list_data = []
    for key, data in table.scan(columns=cols):
        data = data['cf_pic:data']
        list_data.append((key, data))
5c9c44da   Chunk   staged.
90
91
    timer.report()

1821e0e3   Chunk   benchmarking...
92
    print '[time]processing...'
54e2adda   Chunk   staged.
93
    timer.mark()
1821e0e3   Chunk   benchmarking...
94
95
    sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S',
                         master='spark://HPC-server:7077')
84648488   Chunk   reverted.
96
    rdd_data = sparker.sc.parallelize(list_data, 40) \
f4fb4381   Chunk   staged.
97
        .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
5c9c44da   Chunk   staged.
98
        .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
51708346   Chunk   final experiments...
99
        .mapValues(lambda items: SC.rddfeat_ILS(items))
54e2adda   Chunk   staged.
100
    timer.report()
f4fb4381   Chunk   staged.
101

1821e0e3   Chunk   benchmarking...
102
    print '[time]writing table...'
54e2adda   Chunk   staged.
103
    timer.mark()
1821e0e3   Chunk   benchmarking...
104
105
    try:
        with table.batch(batch_size=5000) as b:
5c9c44da   Chunk   staged.
106
107
            for item in rdd_data.collect():
                imgname, imginfo = item[0], item[1]
f4fb4381   Chunk   staged.
108
                b.put(imgname,
5c9c44da   Chunk   staged.
109
110
111
112
113
114
115
116
117
118
119
120
                      {
                          'cf_pic:data': imginfo[0],
                          'cf_info:width': str(imginfo[1]),
                          'cf_info:height': str(imginfo[2]),
                          'cf_info:size': str(imginfo[3]),
                          'cf_info:capacity': str(imginfo[4]),
                          'cf_info:quality': str(imginfo[5]),
                          'cf_info:rate': str(imginfo[6]),
                          'cf_tag:chosen': str(imginfo[7]),
                          'cf_tag:class': str(imginfo[8]),
                          'cf_feat:ibd': imginfo[9],
                      })
1821e0e3   Chunk   benchmarking...
121
    except ValueError:
5c9c44da   Chunk   staged.
122
123
        raise
    timer.report()
1821e0e3   Chunk   benchmarking...

84648488   Chunk   reverted.

f4fb4381   Chunk   staged.

84648488   Chunk   reverted.