您的位置:首页 > 编程语言

Spark读写Hbase示例代码

2016-01-20 20:30 363 查看
最少需要导入hbase以下4个jar:

hbase-client
hbase-common
hbase-protocol
hbase-server

写入Hbase

def
writeHbaseFromRDD
(tableName:String,
columnFamily:String,
column:String,
rdd:RDD[(String,
String)]):
Unit
={

val
hbaseConf = HBaseConfiguration.create()

//
新旧API都可以用
,大部分Hadoop
版本包含新旧两版的
API

/** hadoop
旧API写法*/

// val jobConf = new org.apache.hadoop.mapred.JobConf(hbaseConf)

// jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])

// jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)

// rdd.map{

// case(key,value) =>

// val p = new Put(Bytes.toBytes(key))

// p.add(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value))

//// p.setWriteToWAL(false)

// (new ImmutableBytesWritable,p)

// }.saveAsHadoopDataset(jobConf)

/** hadoop
新API写法*/

val
job =
new
org.apache.hadoop.mapreduce.Job(hbaseConf)

job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])

job.setOutputKeyClass(classOf[ImmutableBytesWritable])

job.setOutputValueClass(classOf[Writable])

job.getConfiguration.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE,
tableName)

rdd.map{

case
(key,value)
=>

val
p =
new
Put(Bytes.
toBytes(key))

p.add(Bytes.toBytes(columnFamily)
,Bytes.toBytes
(column),
Bytes.toBytes(value))

// p.setWriteToWAL(false)

(
new
ImmutableBytesWritable,
p)

}.saveAsNewAPIHadoopDataset(job.getConfiguration)

}

读取Hbase
def
readAsRDD
():
Unit
={

val
sparkConf =
new
SparkConf().setAppName(
"read-hbase-test").setMaster("local"
)

sparkConf.set("spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)

val
sc =
new
SparkContext(sparkConf)

val
hbaseConf = HandleHbase.conf

/** hbase
中的scan可以在这里当做参数传入*/

val
scan =
new
Scan()

scan.setStartRow(Bytes.toBytes(
"row-1"))

scan.setStopRow(Bytes.toBytes(
"row-2"))

def
convertScanToString
(scan: Scan) = {

val
proto: ClientProtos.Scan = ProtobufUtil.
toScan(scan)

Base64.encodeBytes(proto.toByteArray)

}

/** TableInputFormat
中有若干参数可以用来过滤
,可以参考看一下TableInputFormat的静态常量*/

hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.
SCAN,

convertScanToString(scan))

hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,
"spark_hbase_test")

val
rdd = sc.newAPIHadoopRDD(hbaseConf,
classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat]
,

classOf[ImmutableBytesWritable]
,classOf[org.apache.hadoop.hbase.client.Result])

rdd.foreach(println)

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: