Spark连接HBase进行读写相关操作【CDH5.7.X】
2017-10-29 21:42
267 查看
Spark连接HBase进行读写相关操作【CDH5.7.X】
文章内容:
1. 通过Spark读取HBase的表并通过转换RDD2.Spark连接HBase进行表写入操作
版本:
CDH集群版本:CDH5.7.1
Spark版本:spark-1.6.0+cdh5.7.1+193 HBase版本:hbase-1.2.0+cdh5.7.1+142 |
准备工作:
1.集群环境准备
已经安装CDH5.7.X集群集群安装Spark和HBase相关组件
2.开发环境准备
Maven工程需要添加Spark Core和HBase依赖a.HBase的Maven依赖
注意其中必须去除掉servlet-api的依赖否则在SparkContext初始化的时候会出现servlet-api版本冲突异常<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-annotations</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>jsp-api</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api-2.5</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>jsp-api-2.1</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api-2.5</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-prefix-tree</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-rest</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api-2.5</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-thrift</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api-2.5</artifactId> </exclusion> </exclusions> </dependency>
b.Spark Core的Maven依赖
<!-- spark core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency>
3. 数据准备
HBase表通过hbase shell的方式创建test表,put几条测试数据create 'test','cf'
put 'test','row1','cf:name','zs'
put 'test','row1','cf:age','23'
put 'test','row2','cf:name','we'
put 'test','row2','cf:age','15'
put 'test','row3','cf:name','ls'
put 'test','row3','cf:age','42'
put 'test','row4','cf:name','zs'
put 'test','row4','cf:age','75'
4. Spark读取HBase表
第一步:创建HBaseConf//导入相关包
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableNotFoundException
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
//创建HBaseConf
val conf = HBaseConfiguration.create();
第二步:HBaseConf设置输入表"test"
conf.set(TableInputFormat.INPUT_TABLE, "test")
第三步:创建SparkContext
//注意本地设置为Spark本地运行,如果集群模式运行把setMaster("local")去掉
val sparkConf = new SparkConf().setAppName("my app").setMaster("local")
//创建SparkContext
val sc = new SparkContext(sparkConf)
第四步:读取HBaseConf转换RDD
//通过newAPIHadoopRDD方法读取hbase表转换为RDD
val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
第五步:对hbaseRDD进行操作
//HBase 对test表进行统计
val testCount = hbaseRDD.count()
println("count :" + testCount) //4
//循环每一行HBase的数据
hBaseRDD.foreach{case (_,result) =>
//获取行键
val key = Bytes.toString(result.getRow)
//通过列族和列名获取列
val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
println("Row key:" + key + " ,name:" + name + ",age :" + age)
}
//Row key: row1,name:zs,age:23
//....
5. Spark对HBase表进行写入
第一步:创建HBaseConf//导入相关包
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableNotFoundException
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
//创建HBaseConf
val conf = HBaseConfiguration.create();
第二步:HBaseConf设置输入表"test"
conf.set(TableInputFormat.INPUT_TABLE, "test")
第三步:创建SparkContext
//注意本地设置为Spark本地运行,如果集群模式运行把setMaster("local")去掉
val sparkConf = new SparkConf().setAppName("my app").setMaster("local")
//创建SparkContext
val sc = new SparkContext(conf)
第四步:创建Job,并获取Configuration
//创建Job
val job = new Job(sc.hadoopConfiguration)
//设置输出的KeyClass
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
//设置输出ValueClass
job.setOutputValueClass(classOf[Result])
//设置OutputFormat
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val hbaseWriterConf = job.getConfiguration()
第五步:创建写入HBase表的数据的RDD
//通过Array数组创建3条数据的RDD
val inDataRDD = sc.makeRDD(Array("row5,Anm,15","row6,Lily,16","row7,Mak,29"))
//RDD转换成可以进行HBase表数据写入的格式的RDD
val hbaseWriterRdd = inDataRDD.map(_.split(',')).map{arr=>{
val put = new Put(Bytes.toBytes(arr(0)))
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
(new ImmutableBytesWritable, put)
}}
第六步:数据写入HBase表
//调用saveAsNewAPIHadoopDataset写入HBase数据表
hbaseWriterRdd.saveAsNewAPIHadoopDataset(hbaseWriterConf)
第七步:关闭SparkContext
//自从写入HBase表操作完成,关闭SparkContest
sc.stop()
7. HBase写入数据检查
通过hbase shell的scan命令进行查看,如果出现下图情况,证明数据已经写入正常scan 'test'
相关文章推荐
- jdbc 连接数据库并进行操作相关代码
- Hbase总结(三)--使用spring-data-hadoop进行hbase的读写操作
- Android 连接Mysql进行相关操作——用PHP做中介以及JSON做数据交换
- [置顶] Java API连接HBase 进行增删查改操作
- Java 连接hbase 进行简单操作
- SpringBoot中连接MYSQL数据库,并使用JPA进行数据库的相关操作
- Android 连接Mysql进行相关操作——用PHP做中介以及JSON做数据交换
- 从jsp页面中连接数据库并进行相关操作(增删查改)
- win7系统eclipse远程连接虚拟机中的centos6.5,对hbase进行操作
- SpringBoot中连接MYSQL数据库,并使用JPA进行数据库的相关操作
- 利用ADO连接ACCESS数据库,并进行读写操作(MFC)
- Python 使用 Thrift 连接 HBASE 进行操作
- 利用JDK7的NIO2.0进行I/O读写和文件操作监控
- 如何验证本地磁盘或网络磁盘是否可以进行读写操作
- 对外设进行读写操作的过程
- Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询
- 循环体内包含了大量没有必要在循环中处理的语句或获取数据库连接或进行不必要的try-catch操作
- 一起学spark(8) -- 针对两个pair rdd 的连接操作以及pair RDD 的行动操作
- 一起学spark(8) -- 针对两个pair rdd 的连接操作以及pair RDD 的行动操作
- Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询