# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import sys from common import * from dependencies import * from pyspark import SparkConf, SparkContext if __name__ == "__main__": load_env() host = 'HPC-server' table = 'test' conf = SparkConf().setAppName("HBaseInputFormat") print conf.getAll() sc = SparkContext(conf=conf) hparams = dict( inputFormatClass="org.apache.hadoop.hbase.mapreduce.TableInputFormat", readKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", readValueClass="org.apache.hadoop.hbase.client.Result", readKeyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter", readValueConverter="org.apache.spark.examples.pythonconverters.CustomHBaseResultToStringConverter", outputFormatClass="org.apache.hadoop.hbase.mapreduce.TableOutputFormat", writeKeyClass="org.apache.hadoop.hbase.io.ImmutableBytesWritable", # writeValueClass="org.apache.hadoop.io.Writable", writeValueClass="org.apache.hadoop.hbase.client.Put", writeKeyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", writeValueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter", ) hconf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table, } hbase_rdd = sc.newAPIHadoopRDD(inputFormatClass=hparams["inputFormatClass"], keyClass=hparams["readKeyClass"], valueClass=hparams["readValueClass"], keyConverter=hparams["readKeyConverter"], valueConverter=hparams["readValueConverter"], conf=hconf) output = hbase_rdd.collect() for (k, v) in output: print (k, v) sc.stop()