Commit f69baeb66d0df4b82e33012523d21133c76cbe9a

Authored by Chunk
1 parent ca73c96f

spark streaming init.

.idea/ImageR.iml
... ... @@ -2,7 +2,7 @@
2 2 <module type="PYTHON_MODULE" version="4">
3 3 <component name="NewModuleRootManager">
4 4 <content url="file://$MODULE_DIR$" />
5   - <orderEntry type="jdk" jdkName="Python 2.7.8 virtualenv at ~/.virtualenvs/env1" jdkType="Python SDK" />
  5 + <orderEntry type="jdk" jdkName="Python 2.7.6 virtualenv at ~/.virtualenvs/env0" jdkType="Python SDK" />
6 6 <orderEntry type="sourceFolder" forTests="false" />
7 7 </component>
8 8 </module>
9 9 \ No newline at end of file
... ...
common.py
... ... @@ -14,6 +14,8 @@ import ConfigParser
14 14  
15 15 import numpy as np
16 16  
  17 +package_dir = os.path.dirname(os.path.abspath(__file__))
  18 +
17 19  
18 20 class Timer():
19 21 def __init__(self):
... ... @@ -66,7 +68,8 @@ def get_env_variable(var_name, default=False):
66 68 import StringIO
67 69 import ConfigParser
68 70  
69   - env_file = os.environ.get('PROJECT_ENV_FILE', "res/.env")
  71 + res_envfile = os.path.join(package_dir, 'res', '.env')
  72 + env_file = os.environ.get('PROJECT_ENV_FILE', res_envfile)
70 73 try:
71 74 config = StringIO.StringIO()
72 75 config.write("[DATA]\n")
... ... @@ -95,7 +98,8 @@ def get_env_variable(var_name, default=False):
95 98  
96 99  
97 100 def load_env(default=False):
98   - env_file = os.environ.get('PROJECT_ENV_FILE', "res/.env")
  101 + res_envfile = os.path.join(package_dir, 'res', '.env')
  102 + env_file = os.environ.get('PROJECT_ENV_FILE', res_envfile)
99 103 try:
100 104 config = StringIO.StringIO()
101 105 config.write("[DATA]\n")
... ...
mspark/SC.py
1 1 __author__ = 'chunk'
2 2  
3 3 from ..common import *
  4 +from .dependencies import *
  5 +from . import *
4 6  
5 7 import sys
6   -from dependencies import *
7 8 from pyspark import SparkConf, SparkContext
8 9 from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD
9 10 from pyspark.mllib.regression import LabeledPoint
... ... @@ -12,23 +13,6 @@ import json
12 13 import pickle
13 14  
14 15  
15   -hparams = dict(
16   - inputFormatClass="org.apache.hadoop.hbase.mapreduce.TableInputFormat",
17   - readKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable",
18   - readValueClass="org.apache.hadoop.hbase.client.Result",
19   - readKeyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
20   - readValueConverter="org.apache.spark.examples.pythonconverters.CustomHBaseResultToStringConverter",
21   -
22   - outputFormatClass="org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
23   - writeKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable",
24   - # writeValueClass="org.apache.hadoop.io.Writable",
25   - writeValueClass="org.apache.hadoop.hbase.client.Put",
26   - writeKeyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
27   - writeValueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter",
28   -
29   -)
30   -
31   -
32 16 def parse_cv(raw_row):
33 17 """
34 18 input: (u'key0',u'cf_feat:hog:[0.056273,...]--%--cf_pic:data:\ufffd\ufffd\...--%--cf_tag:hog:True')
... ...
mspark/SSC.py 0 โ†’ 100644
... ... @@ -0,0 +1,61 @@
  1 +__author__ = 'chunk'
  2 +
  3 +from ..common import *
  4 +from . import *
  5 +from .dependencies import *
  6 +from .SC import *
  7 +
  8 +import sys
  9 +from pyspark import SparkConf, SparkContext
  10 +from pyspark.streaming import StreamingContext
  11 +
  12 +
  13 +class StreamSparker(Sparker):
  14 + def __init__(self, host='HPC-server', appname='NewPySparkStreamingApp', source='localhost', port=9999, **kwargs):
  15 + Sparker.__init__(self, host, appname)
  16 +
  17 + self.source = source
  18 + self.port = port
  19 + self.ssc = StreamingContext(sparkContext=self.sc, batchDuration=1)
  20 +
  21 + def start(self):
  22 + self.ssc.start()
  23 + self.ssc.awaitTermination()
  24 +
  25 +
  26 + def set_datasource(self, source='localhost', port=9999):
  27 + self.source = source
  28 + self.port = port
  29 +
  30 + def _word_count(self):
  31 + lines = self.ssc.socketTextStream(self.source, self.port)
  32 + words = lines.flatMap(lambda line: line.split(" "))
  33 + pairs = words.map(lambda word: (word, 1))
  34 + wordCounts = pairs.reduceByKey(lambda x, y: x + y)
  35 +
  36 + wordCounts.pprint()
  37 +
  38 + self.start()
  39 +
  40 +
  41 +
  42 +
  43 +
  44 +
  45 +
  46 +
  47 +
  48 +
  49 +
  50 +
  51 +
  52 +
  53 +
  54 +
  55 +
  56 +
  57 +
  58 +
  59 +
  60 +
  61 +
... ...
mspark/__init__.py
1 1 __author__ = 'chunk'
  2 +
  3 +hparams = dict(
  4 + inputFormatClass="org.apache.hadoop.hbase.mapreduce.TableInputFormat",
  5 + readKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable",
  6 + readValueClass="org.apache.hadoop.hbase.client.Result",
  7 + readKeyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
  8 + readValueConverter="org.apache.spark.examples.pythonconverters.CustomHBaseResultToStringConverter",
  9 +
  10 + outputFormatClass="org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
  11 + writeKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable",
  12 + # writeValueClass="org.apache.hadoop.io.Writable",
  13 + writeValueClass="org.apache.hadoop.hbase.client.Put",
  14 + writeKeyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
  15 + writeValueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter",
  16 +
  17 +)
  18 +
... ...