pipelines.py 5.93 KB
# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html

from .items import *

from hashlib import md5
import happybase
import logging
import socket
import SocketServer
import pickle
import json
import cjson


class HbaseDumper(object):
    def __init__(self, Item, tablename=None):
        self.Item = Item  # class not object
        self.table_name = tablename if tablename != None else self.Item.__name__
        self.table = None
        self.connection = None
        self.sparkcontex = None

    # @deprecated
    def get_table_orig(self):
        if self.table != None:
            return self.table

        if self.connection is None:
            c = happybase.Connection('HPC-server')
            self.connection = c

        tables = self.connection.tables()
        if self.table_name not in tables:
            families = self.Item.fields
            self.connection.create_table(name=self.table_name, families=families)

        table = self.connection.table(name=self.table_name)

        self.table = table

        return table

    def get_table(self):
        if self.table != None:
            return self.table

        if self.connection is None:
            c = happybase.Connection('HPC-server')
            self.connection = c

        tables = self.connection.tables()
        if self.table_name not in tables:
            families = {'cf_pic': dict(),
                        'cf_info': dict(max_versions=10),
                        'cf_tag': dict(),
                        'cf_feat': dict(),
                        }
            self.connection.create_table(name=self.table_name, families=families)

        table = self.connection.table(name=self.table_name)

        self.table = table

        return table

    def store_item(self, item):
        if self.table == None:
            self.table = self.get_table()
        # data = {}
        # for key in item.keys():
        # data[key + ':'] = item[key]
        # self.table.put(item['id'], data)
        self.table.put(item['id'], {'cf_pic:data': item['data']})

    # @TODO: Bulk put
    def store_items(self, items):
        if self.table == None:
            self.table = self.get_table()

        dict_databuf = {}
        for item in items:
            data = {}
            for key in item.keys():
                data[key + ':'] = item[key]
            dict_databuf[item['id']] = data

        try:
            with self.table.batch(batch_size=5000) as b:
                for rowkey, data in dict_databuf.items():
                    b.put(rowkey, data)
        except ValueError:
            raise
            pass


class MspiderDBPipeline(object):
    logger = logging.getLogger("mspider")

    def __init__(self):
        self.hbasedumper = HbaseDumper(LarvaeItem, tablename='MSPIDER')

        self.logger.setLevel(logging.DEBUG)
        hd = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s - [%(levelname)s] - %(message)s")
        hd.setFormatter(formatter)
        self.logger.addHandler(hd)

    def process_item(self, item, spider):
        try:
            self.logger.warn("scraping <%s>" % (item['link']))
            self.hbasedumper.store_item(item)  # one by one
            # self.hbasedumper.store_items(item) # bulk put
        except:
            raise
        return item


class MspiderStreamPipeline(object):
    """
    cjson is awsome!!!
    """
    logger = logging.getLogger("mspider")

    def __init__(self, host='192.168.2.118', port=9999):
        self.host, self.port = host, port

        self.logger.setLevel(logging.DEBUG)
        hd = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s - [%(levelname)s] - %(message)s")
        hd.setFormatter(formatter)
        self.logger.addHandler(hd)

    # def open_spider(self, spider):
    # self.sock = socket.socket()
    # self.sock.connect((self.host, self.port))
    #
    # def close_spider(self, spider):
    # self.sock.close()

    def _client(self, item):
        try:
            self.logger.warn("scraping <%s>" % (item['link']))

            self.sock = socket.socket()
            self.sock.connect((self.host, self.port))

            # pk_item = pickle.dumps(dict(item))
            item['data'] = item['data'].decode('latin1').encode('utf8')
            # pk_item = json.dumps(dict(item), encoding='utf-8') + "\n"
            pk_item = cjson.encode(dict(item)) + "\n"

            self.sock.sendall(pk_item)
        except:
            raise
        finally:
            self.sock.close()

    def _server(self, item):
        try:
            self.logger.warn("scraping <%s>" % (item['link']))

            self.sock = socket.socket()
            self.sock.bind((self.host, self.port))
            self.sock.listen(1)
            conn, addr = self.sock.accept()
            print 'Connected by', addr

            # pk_item = pickle.dumps(dict(item))
            item['data'] = item['data'].decode('latin1').encode('utf8')
            # pk_item = json.dumps(dict(item), encoding='utf-8') + "\n"
            pk_item = cjson.encode(dict(item)) + "\n"

            conn.sendall(pk_item)
            conn.close()
        except:
            raise
        finally:
            self.sock.close()

    def process_item(self, item, spider):
        # try:
        #     self.logger.warn("scraping <%s>" % (item['link']))
        #
        #     self.sock = socket.socket()
        #     self.sock.connect((self.host, self.port))
        #
        #     # pk_item = pickle.dumps(dict(item))
        #     item['data'] = item['data'].decode('latin1').encode('utf8')
        #     # pk_item = json.dumps(dict(item), encoding='utf-8') + "\n"
        #     pk_item = cjson.encode(dict(item)) + "\n"
        #
        #     self.sock.sendall(pk_item)
        # except:
        #     raise
        # finally:
        #     self.sock.close()
        #     pass

        self._server(item)

        return item