# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import sys from common import * from dependencies import * from pyspark import SparkConf, SparkContext import ast import simplejson as json def result2dict(result): """ :param result: e.g. ("row8", "f1:hog:caocaocao;f1:q1:value3") :return: """ pass if __name__ == "__main__": load_env() host = 'HPC-server' table = 'test' conf = SparkConf().setAppName("HBaseOutputFormat") print conf.getAll() sc = SparkContext(conf=conf) hparams = dict( inputFormatClass="org.apache.hadoop.hbase.mapreduce.TableInputFormat", readKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", readValueClass="org.apache.hadoop.hbase.client.Result", readKeyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter", readValueConverter="org.apache.spark.examples.pythonconverters.CustomHBaseResultToStringConverter", outputFormatClass="org.apache.hadoop.hbase.mapreduce.TableOutputFormat", writeKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", # writeValueClass="org.apache.hadoop.io.Writable", writeValueClass="org.apache.hadoop.hbase.client.Put", writeKeyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", writeValueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter", ) hconf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table, "hbase.mapred.outputtable": table, "mapreduce.outputformat.class": hparams["outputFormatClass"], "mapreduce.job.output.key.class": hparams["writeKeyClass"], "mapreduce.job.output.value.class": hparams["writeValueClass"], } hbase_rdd = sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], keyClass=hparams["readKeyClass"], valueClass=hparams["readValueClass"], keyConverter=hparams["readKeyConverter"], valueConverter=hparams["readValueConverter"], conf=hconf) hbase_rdd = hbase_rdd.flatMapValues(lambda v: [s.split(':') for s in v.split(';')]) # hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\t")).mapValues(ast.literal_eval).groupByKey() output = hbase_rdd.collect() for (k, v) in output: print (k, v) # with open("res/rdd.json", "w") as f: # f.write( # json.dumps(output)) # f.close() # dict_test = dict(row6="hehe", row7="cao") # list_test = ["row8", "f1", "hog", "caocaocao"] # sc.parallelize([list_test]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( # conf=hconf, # keyConverter=hparams["writeKeyConverter"], # valueConverter=hparams["writeValueConverter"]) sc.stop()