您的位置:首页 > 编程语言

Spark读写Hbase示例代码

2017-04-06 17:02 239 查看
最少需要导入hbase以下4个jar:

hbase-client
hbase-common
hbase-protocol
hbase-server

写入Hbase

def writeHbaseFromRDD (tableName:String, columnFamily:String, column:String,rdd:RDD[(String, String)]): Unit ={
val hbaseConf = HBaseConfiguration.create()
// 新旧API都可以用 ,大部分Hadoop 版本包含新旧两版的 API
/** hadoop 旧API写法*/
// val jobConf = new org.apache.hadoop.mapred.JobConf(hbaseConf)

// jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])

// jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)

// rdd.map{

// case(key,value) =>

// val p = new Put(Bytes.toBytes(key))

// p.add(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value))

//// p.setWriteToWAL(false)

// (new ImmutableBytesWritable,p)

// }.saveAsHadoopDataset(jobConf)

/** hadoop 新API写法*/
val job = new org.apache.hadoop.mapreduce.Job(hbaseConf)

job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])

job.setOutputKeyClass(classOf[ImmutableBytesWritable])

job.setOutputValueClass(classOf[Writable])

job.getConfiguration.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE,tableName)

rdd.map{
case (key,value)
=>
val p = new Put(Bytes. toBytes(key))

p.add(Bytes.toBytes(columnFamily) ,Bytes.toBytes (column), Bytes.toBytes(value))
// p.setWriteToWAL(false)
new ImmutableBytesWritable, p)

}.saveAsNewAPIHadoopDataset(job.getConfiguration)

}

读取Hbase

def readAsRDD (): Unit ={
val sparkConf = new SparkConf().setAppName( "read-hbase-test").setMaster("local" )

sparkConf.set("spark.serializer" , "org.apache.spark.serializer.KryoSerializer" )
val sc = new SparkContext(sparkConf)
val hbaseConf = HandleHbase.conf
/** hbase 中的scan可以在这里当做参数传入*/
val scan = new Scan()

scan.setStartRow(Bytes.toBytes( "row-1"))

scan.setStopRow(Bytes.toBytes( "row-2"))
def convertScanToString (scan:
Scan) = {
val proto: ClientProtos.Scan = ProtobufUtil. toScan(scan)

Base64.encodeBytes(proto.toByteArray)

}
/** TableInputFormat 中有若干参数可以用来过滤 ,可以参考看一下TableInputFormat的静态常量*/
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat. SCAN,
convertScanToString(scan))

hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,"spark_hbase_test")
val rdd = sc.newAPIHadoopRDD(hbaseConf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat] ,
classOf[ImmutableBytesWritable] ,classOf[org.apache.hadoop.hbase.client.Result])

rdd.foreach(println)

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: