Spark读写Hbase示例代码
2017-04-06 17:02
239 查看
最少需要导入hbase以下4个jar:
hbase-client
hbase-common
hbase-protocol
hbase-server
写入Hbase
def writeHbaseFromRDD (tableName:String, columnFamily:String, column:String,rdd:RDD[(String, String)]): Unit ={
val hbaseConf = HBaseConfiguration.create()
// 新旧API都可以用 ,大部分Hadoop 版本包含新旧两版的 API
/** hadoop 旧API写法*/
// val jobConf = new org.apache.hadoop.mapred.JobConf(hbaseConf)
// jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
// jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)
// rdd.map{
// case(key,value) =>
// val p = new Put(Bytes.toBytes(key))
// p.add(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value))
//// p.setWriteToWAL(false)
// (new ImmutableBytesWritable,p)
// }.saveAsHadoopDataset(jobConf)
/** hadoop 新API写法*/
val job = new org.apache.hadoop.mapreduce.Job(hbaseConf)
job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Writable])
job.getConfiguration.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE,tableName)
rdd.map{
case (key,value)
=>
val p = new Put(Bytes. toBytes(key))
p.add(Bytes.toBytes(columnFamily) ,Bytes.toBytes (column), Bytes.toBytes(value))
// p.setWriteToWAL(false)
( new ImmutableBytesWritable, p)
}.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
读取Hbase
def readAsRDD (): Unit ={
val sparkConf = new SparkConf().setAppName( "read-hbase-test").setMaster("local" )
sparkConf.set("spark.serializer" , "org.apache.spark.serializer.KryoSerializer" )
val sc = new SparkContext(sparkConf)
val hbaseConf = HandleHbase.conf
/** hbase 中的scan可以在这里当做参数传入*/
val scan = new Scan()
scan.setStartRow(Bytes.toBytes( "row-1"))
scan.setStopRow(Bytes.toBytes( "row-2"))
def convertScanToString (scan:
Scan) = {
val proto: ClientProtos.Scan = ProtobufUtil. toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
/** TableInputFormat 中有若干参数可以用来过滤 ,可以参考看一下TableInputFormat的静态常量*/
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat. SCAN,
convertScanToString(scan))
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,"spark_hbase_test")
val rdd = sc.newAPIHadoopRDD(hbaseConf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat] ,
classOf[ImmutableBytesWritable] ,classOf[org.apache.hadoop.hbase.client.Result])
rdd.foreach(println)
}
hbase-client
hbase-common
hbase-protocol
hbase-server
写入Hbase
def writeHbaseFromRDD (tableName:String, columnFamily:String, column:String,rdd:RDD[(String, String)]): Unit ={
val hbaseConf = HBaseConfiguration.create()
// 新旧API都可以用 ,大部分Hadoop 版本包含新旧两版的 API
/** hadoop 旧API写法*/
// val jobConf = new org.apache.hadoop.mapred.JobConf(hbaseConf)
// jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
// jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)
// rdd.map{
// case(key,value) =>
// val p = new Put(Bytes.toBytes(key))
// p.add(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value))
//// p.setWriteToWAL(false)
// (new ImmutableBytesWritable,p)
// }.saveAsHadoopDataset(jobConf)
/** hadoop 新API写法*/
val job = new org.apache.hadoop.mapreduce.Job(hbaseConf)
job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Writable])
job.getConfiguration.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE,tableName)
rdd.map{
case (key,value)
=>
val p = new Put(Bytes. toBytes(key))
p.add(Bytes.toBytes(columnFamily) ,Bytes.toBytes (column), Bytes.toBytes(value))
// p.setWriteToWAL(false)
( new ImmutableBytesWritable, p)
}.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
读取Hbase
def readAsRDD (): Unit ={
val sparkConf = new SparkConf().setAppName( "read-hbase-test").setMaster("local" )
sparkConf.set("spark.serializer" , "org.apache.spark.serializer.KryoSerializer" )
val sc = new SparkContext(sparkConf)
val hbaseConf = HandleHbase.conf
/** hbase 中的scan可以在这里当做参数传入*/
val scan = new Scan()
scan.setStartRow(Bytes.toBytes( "row-1"))
scan.setStopRow(Bytes.toBytes( "row-2"))
def convertScanToString (scan:
Scan) = {
val proto: ClientProtos.Scan = ProtobufUtil. toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
/** TableInputFormat 中有若干参数可以用来过滤 ,可以参考看一下TableInputFormat的静态常量*/
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat. SCAN,
convertScanToString(scan))
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,"spark_hbase_test")
val rdd = sc.newAPIHadoopRDD(hbaseConf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat] ,
classOf[ImmutableBytesWritable] ,classOf[org.apache.hadoop.hbase.client.Result])
rdd.foreach(println)
}
相关文章推荐
- Spark读写Hbase示例代码
- spark源码阅读一-spark读写hbase代码分析
- 在Eclipse中运行JAVA代码远程操作HBase的示例
- 【甘道夫】Eclipse+Maven搭建HBase开发环境及HBaseDAO代码示例
- shapefile格式说明及读写代码示例(C++)(转)
- shapefile格式说明及读写代码示例(C++)(转)
- shapefile格式说明及读写代码示例
- shapefile格式说明及读写代码示例 --http://www.gispower.org/article/arcgis/arcother/2008/48/0848115049GB922C13K2I22H7G06A4.html
- MapReduce从HBase读写数据简单示例
- javascript读写TEXT文本文件示例代码
- Michael G. Noll:整合Kafka到Spark Streaming——代码示例和挑战
- C#.NET示例读写xml所有节点的代码实现方法和读取xml节点的数据总结
- tinyxml读写xml示例代码
- C/C++读写注册表中二进制数据【代码示例】
- MapReduce从HBase读写数据简单示例
- C#.NET示例读写xml所有节点的代码实现方法和读取xml节点的数据总结
- 集算器读写EXCEL文件的代码示例
- ASP.NET中读写cookie数据示例代码
- c#/ASP.NET操作cookie(读写)代码示例