From f69baeb66d0df4b82e33012523d21133c76cbe9a Mon Sep 17 00:00:00 2001 From: Chunk Date: Tue, 10 Mar 2015 17:33:55 +0800 Subject: [PATCH] spark streaming init. --- .idea/ImageR.iml | 2 +- common.py | 8 ++++++-- mspark/SC.py | 20 ++------------------ mspark/SSC.py | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ mspark/__init__.py | 17 +++++++++++++++++ 5 files changed, 87 insertions(+), 21 deletions(-) create mode 100644 mspark/SSC.py diff --git a/.idea/ImageR.iml b/.idea/ImageR.iml index 6d00e52..4f21c2d 100644 --- a/.idea/ImageR.iml +++ b/.idea/ImageR.iml @@ -2,7 +2,7 @@ - + \ No newline at end of file diff --git a/common.py b/common.py index fbefc32..15aee1e 100644 --- a/common.py +++ b/common.py @@ -14,6 +14,8 @@ import ConfigParser import numpy as np +package_dir = os.path.dirname(os.path.abspath(__file__)) + class Timer(): def __init__(self): @@ -66,7 +68,8 @@ def get_env_variable(var_name, default=False): import StringIO import ConfigParser - env_file = os.environ.get('PROJECT_ENV_FILE', "res/.env") + res_envfile = os.path.join(package_dir, 'res', '.env') + env_file = os.environ.get('PROJECT_ENV_FILE', res_envfile) try: config = StringIO.StringIO() config.write("[DATA]\n") @@ -95,7 +98,8 @@ def get_env_variable(var_name, default=False): def load_env(default=False): - env_file = os.environ.get('PROJECT_ENV_FILE', "res/.env") + res_envfile = os.path.join(package_dir, 'res', '.env') + env_file = os.environ.get('PROJECT_ENV_FILE', res_envfile) try: config = StringIO.StringIO() config.write("[DATA]\n") diff --git a/mspark/SC.py b/mspark/SC.py index cda6f64..d0535d1 100644 --- a/mspark/SC.py +++ b/mspark/SC.py @@ -1,9 +1,10 @@ __author__ = 'chunk' from ..common import * +from .dependencies import * +from . import * import sys -from dependencies import * from pyspark import SparkConf, SparkContext from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD from pyspark.mllib.regression import LabeledPoint @@ -12,23 +13,6 @@ import json import pickle -hparams = dict( - inputFormatClass="org.apache.hadoop.hbase.mapreduce.TableInputFormat", - readKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", - readValueClass="org.apache.hadoop.hbase.client.Result", - readKeyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter", - readValueConverter="org.apache.spark.examples.pythonconverters.CustomHBaseResultToStringConverter", - - outputFormatClass="org.apache.hadoop.hbase.mapreduce.TableOutputFormat", - writeKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", - # writeValueClass="org.apache.hadoop.io.Writable", - writeValueClass="org.apache.hadoop.hbase.client.Put", - writeKeyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", - writeValueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter", - -) - - def parse_cv(raw_row): """ input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True') diff --git a/mspark/SSC.py b/mspark/SSC.py new file mode 100644 index 0000000..806df70 --- /dev/null +++ b/mspark/SSC.py @@ -0,0 +1,61 @@ +__author__ = 'chunk' + +from ..common import * +from . import * +from .dependencies import * +from .SC import * + +import sys +from pyspark import SparkConf, SparkContext +from pyspark.streaming import StreamingContext + + +class StreamSparker(Sparker): + def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost', port=9999, **kwargs): + Sparker.__init__(self, host, appname) + + self.source = source + self.port = port + self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1) + + def start(self): + self.ssc.start() + self.ssc.awaitTermination() + + + def set_datasource(self, source='localhost', port=9999): + self.source = source + self.port = port + + def _word_count(self): + lines = self.ssc.socketTextStream(self.source, self.port) + words = lines.flatMap(lambda line: line.split(" ")) + pairs = words.map(lambda word: (word, 1)) + wordCounts = pairs.reduceByKey(lambda x, y: x + y) + + wordCounts.pprint() + + self.start() + + + + + + + + + + + + + + + + + + + + + + + diff --git a/mspark/__init__.py b/mspark/__init__.py index a1459cf..ffd22fd 100644 --- a/mspark/__init__.py +++ b/mspark/__init__.py @@ -1 +1,18 @@ __author__ = 'chunk' + +hparams = dict( + inputFormatClass="org.apache.hadoop.hbase.mapreduce.TableInputFormat", + readKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", + readValueClass="org.apache.hadoop.hbase.client.Result", + readKeyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter", + readValueConverter="org.apache.spark.examples.pythonconverters.CustomHBaseResultToStringConverter", + + outputFormatClass="org.apache.hadoop.hbase.mapreduce.TableOutputFormat", + writeKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", + # writeValueClass="org.apache.hadoop.io.Writable", + writeValueClass="org.apache.hadoop.hbase.client.Put", + writeKeyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", + writeValueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter", + +) + -- libgit2 0.21.2