Spark读写Hbase示例代码
2016-01-20 20:30
363 查看
最少需要导入hbase以下4个jar:
hbase-client
hbase-common
hbase-protocol
hbase-server
写入Hbase
def
writeHbaseFromRDD
(tableName:String,
columnFamily:String,
column:String,
rdd:RDD[(String,
String)]):
Unit
={
val
hbaseConf = HBaseConfiguration.create()
//
新旧API都可以用
,大部分Hadoop
版本包含新旧两版的
API
/** hadoop
旧API写法*/
// val jobConf = new org.apache.hadoop.mapred.JobConf(hbaseConf)
// jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
// jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)
// rdd.map{
// case(key,value) =>
// val p = new Put(Bytes.toBytes(key))
// p.add(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value))
//// p.setWriteToWAL(false)
// (new ImmutableBytesWritable,p)
// }.saveAsHadoopDataset(jobConf)
/** hadoop
新API写法*/
val
job =
new
org.apache.hadoop.mapreduce.Job(hbaseConf)
job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Writable])
job.getConfiguration.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE,
tableName)
rdd.map{
case
(key,value)
=>
val
p =
new
Put(Bytes.
toBytes(key))
p.add(Bytes.toBytes(columnFamily)
,Bytes.toBytes
(column),
Bytes.toBytes(value))
// p.setWriteToWAL(false)
(
new
ImmutableBytesWritable,
p)
}.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
读取Hbase
def
readAsRDD
():
Unit
={
val
sparkConf =
new
SparkConf().setAppName(
"read-hbase-test").setMaster("local"
)
sparkConf.set("spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)
val
sc =
new
SparkContext(sparkConf)
val
hbaseConf = HandleHbase.conf
/** hbase
中的scan可以在这里当做参数传入*/
val
scan =
new
Scan()
scan.setStartRow(Bytes.toBytes(
"row-1"))
scan.setStopRow(Bytes.toBytes(
"row-2"))
def
convertScanToString
(scan: Scan) = {
val
proto: ClientProtos.Scan = ProtobufUtil.
toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
/** TableInputFormat
中有若干参数可以用来过滤
,可以参考看一下TableInputFormat的静态常量*/
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.
SCAN,
convertScanToString(scan))
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,
"spark_hbase_test")
val
rdd = sc.newAPIHadoopRDD(hbaseConf,
classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat]
,
classOf[ImmutableBytesWritable]
,classOf[org.apache.hadoop.hbase.client.Result])
rdd.foreach(println)
}
hbase-client
hbase-common
hbase-protocol
hbase-server
写入Hbase
def
writeHbaseFromRDD
(tableName:String,
columnFamily:String,
column:String,
rdd:RDD[(String,
String)]):
Unit
={
val
hbaseConf = HBaseConfiguration.create()
//
新旧API都可以用
,大部分Hadoop
版本包含新旧两版的
API
/** hadoop
旧API写法*/
// val jobConf = new org.apache.hadoop.mapred.JobConf(hbaseConf)
// jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
// jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)
// rdd.map{
// case(key,value) =>
// val p = new Put(Bytes.toBytes(key))
// p.add(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value))
//// p.setWriteToWAL(false)
// (new ImmutableBytesWritable,p)
// }.saveAsHadoopDataset(jobConf)
/** hadoop
新API写法*/
val
job =
new
org.apache.hadoop.mapreduce.Job(hbaseConf)
job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Writable])
job.getConfiguration.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE,
tableName)
rdd.map{
case
(key,value)
=>
val
p =
new
Put(Bytes.
toBytes(key))
p.add(Bytes.toBytes(columnFamily)
,Bytes.toBytes
(column),
Bytes.toBytes(value))
// p.setWriteToWAL(false)
(
new
ImmutableBytesWritable,
p)
}.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
读取Hbase
def
readAsRDD
():
Unit
={
val
sparkConf =
new
SparkConf().setAppName(
"read-hbase-test").setMaster("local"
)
sparkConf.set("spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)
val
sc =
new
SparkContext(sparkConf)
val
hbaseConf = HandleHbase.conf
/** hbase
中的scan可以在这里当做参数传入*/
val
scan =
new
Scan()
scan.setStartRow(Bytes.toBytes(
"row-1"))
scan.setStopRow(Bytes.toBytes(
"row-2"))
def
convertScanToString
(scan: Scan) = {
val
proto: ClientProtos.Scan = ProtobufUtil.
toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
/** TableInputFormat
中有若干参数可以用来过滤
,可以参考看一下TableInputFormat的静态常量*/
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.
SCAN,
convertScanToString(scan))
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,
"spark_hbase_test")
val
rdd = sc.newAPIHadoopRDD(hbaseConf,
classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat]
,
classOf[ImmutableBytesWritable]
,classOf[org.apache.hadoop.hbase.client.Result])
rdd.foreach(println)
}
相关文章推荐
- Java中的ReentrantLock和synchronized两种锁机制的对比
- java中的枚举
- Java中的ReentrantLock和synchronized两种锁机制的对比
- Java中的ReentrantLock和synchronized两种锁机制的对比
- struts2环境搭建
- Java多线程(1)
- 131 php 如何将某个模型的数据库信息单独配置
- 虚拟机类加载机制
- java中关于Map的三种遍历方法机putAll的用法详解
- Weblogic错误总结
- Qt5与Qt4的模块简介
- Django - Django框架 简单介绍
- 【慕课笔记】第四章 流程控制语句 第3节 JAVA条件语句之多重if
- raspberry install python-mysqldb
- FindBugs Report安全代码检查工具问题解析
- java虚拟机运行时数据区域及对象的探秘
- 模拟java死锁
- MATLAB画图-legend,box用法
- 【慕课笔记】第四章 流程控制语句 第2节 JAVA条件语句之if...else...
- C#图形验证码