Blame view

test/test_whole.py 3.93 KB
d642d837   Chunk   staged.
1
2
3
__author__ = 'chunk'

from ..mspark import SC
1821e0e3   Chunk   benchmarking...
4
5
6
from ..common import *
from ..mdata import ILSVRC, ILSVRC_S

d642d837   Chunk   staged.
7
from pyspark.mllib.regression import LabeledPoint
54e2adda   Chunk   staged.
8
9
import happybase

5c9c44da   Chunk   staged.
10

54e2adda   Chunk   staged.
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def test_whole():
    cols0 = [
        'cf_pic:data',
        'cf_info:width',
        'cf_info:height',
        'cf_info:size',
        'cf_info:capacity',
        'cf_info:quality',
        'cf_info:rate',
        'cf_tag:chosen',
        'cf_tag:class'
    ]
    cols1 = [
        'cf_pic:data',
        'cf_info:width',
        'cf_info:height',
        'cf_info:size',
        'cf_info:capacity',
        'cf_info:quality',
        'cf_info:rate',
        'cf_tag:chosen',
        'cf_tag:class',
        'cf_feat:bid',
    ]

84648488   Chunk   reverted.
36
    sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
54e2adda   Chunk   staged.
37
38
39

    # rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False) \
    # .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
5c9c44da   Chunk   staged.
40
    # .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
1821e0e3   Chunk   benchmarking...
41
    # .mapValues(lambda items: SC.rddfeat_ILS(items))
54e2adda   Chunk   staged.
42

84648488   Chunk   reverted.
43
    rdd_data = sparker.read_hbase("ILSVRC2013_DET_val-Test_1", func=SC.rddparse_data_ILS, collect=False).mapValues(
54e2adda   Chunk   staged.
44
45
46
47
48
49
50
51
        lambda data: [data] + SC.rddinfo_ILS(data))
    rdd_data_ext = rdd_data.map(lambda x: SC.rddembed_ILS(x, rate=0.2)).filter(lambda x: x != None)

    rdd_data = rdd_data.union(rdd_data_ext).mapValues(lambda items: SC.rddfeat_ILS(items))

    print len(rdd_data.collect())

    # sparker.write_hbase("ILSVRC2013_DET_val-Test_1", rdd_data, fromrdd=True, columns=cols1,
f4fb4381   Chunk   staged.
52
    #                     withdata=True)
54e2adda   Chunk   staged.
53
54


1821e0e3   Chunk   benchmarking...
55
56
57
58
59
60
61
def test_whole_ext(category='Train_100'):
    timer = Timer()

    print '[time]category:', category

    print '[time]formating table...'
    timer.mark()
84648488   Chunk   reverted.
62
    dil = ILSVRC.DataILSVRC(base_dir='/data/hadoop/ImageNet/ILSVRC/ILSVRC2013_DET_val', category=category)
1821e0e3   Chunk   benchmarking...
63
    dil.delete_table()
f4fb4381   Chunk   staged.
64
    dil.format()
1821e0e3   Chunk   benchmarking...
65
66
67
68
69
70
    dil.store_img()
    timer.report()

    print '[time]reading table...'
    timer.mark()
    table_name = dil.table_name
54e2adda   Chunk   staged.
71
72
73
    connection = happybase.Connection('HPC-server')
    tables = connection.tables()
    if table_name not in tables:
f4fb4381   Chunk   staged.
74
75
76
77
78
79
80
81
82
        families = {'cf_pic': dict(),
                    'cf_info': dict(max_versions=10),
                    'cf_tag': dict(),
                    'cf_feat': dict(),
                    }
        connection.create_table(name=table_name, families=families)
    table = connection.table(name=table_name)

    cols = ['cf_pic:data']
54e2adda   Chunk   staged.
83
84
85
86
87
88
89
    list_data = []
    for key, data in table.scan(columns=cols):
        data = data['cf_pic:data']
        list_data.append((key, data))
    timer.report()

    print '[time]processing...'
5c9c44da   Chunk   staged.
90
91
    timer.mark()
    sparker = SC.Sparker(host='HPC-server', appname='ImageILSVRC-S', master='spark://HPC-server:7077')
1821e0e3   Chunk   benchmarking...
92
    rdd_data = sparker.sc.parallelize(list_data, 40) \
54e2adda   Chunk   staged.
93
        .mapValues(lambda data: [data] + SC.rddinfo_ILS(data)) \
1821e0e3   Chunk   benchmarking...
94
95
        .flatMap(lambda x: SC.rddembed_ILS_EXT(x, rate=0.2)) \
        .mapValues(lambda items: SC.rddfeat_ILS(items))
84648488   Chunk   reverted.
96
    timer.report()
f4fb4381   Chunk   staged.
97

5c9c44da   Chunk   staged.
98
    print '[time]writing table...'
51708346   Chunk   final experiments...
99
    timer.mark()
54e2adda   Chunk   staged.
100
    try:
f4fb4381   Chunk   staged.
101
        with table.batch(batch_size=5000) as b:
1821e0e3   Chunk   benchmarking...
102
            for item in rdd_data.collect():
54e2adda   Chunk   staged.
103
                imgname, imginfo = item[0], item[1]
1821e0e3   Chunk   benchmarking...
104
105
                b.put(imgname,
                      {
5c9c44da   Chunk   staged.
106
107
                          'cf_pic:data': imginfo[0],
                          'cf_info:width': str(imginfo[1]),
f4fb4381   Chunk   staged.
108
                          'cf_info:height': str(imginfo[2]),
5c9c44da   Chunk   staged.
109
110
111
112
113
114
115
116
117
118
119
                          'cf_info:size': str(imginfo[3]),
                          'cf_info:capacity': str(imginfo[4]),
                          'cf_info:quality': str(imginfo[5]),
                          'cf_info:rate': str(imginfo[6]),
                          'cf_tag:chosen': str(imginfo[7]),
                          'cf_tag:class': str(imginfo[8]),
                          'cf_feat:ibd': imginfo[9],
                      })
    except ValueError:
        raise
    timer.report()
1821e0e3   Chunk   benchmarking...

5c9c44da   Chunk   staged.

1821e0e3   Chunk   benchmarking...

84648488   Chunk   reverted.

f4fb4381   Chunk   staged.

84648488   Chunk   reverted.