Hbase 整合 Hadoop 的数据迁移
上篇文章说了 Hbase 的基础架构,都是比较理论的知识,最近我也一直在搞 Hbase 的数据迁移, 今天就来一篇实战型的,把最近一段时间的 Hbase 整合 Hadoop 的基础知识在梳理一遍,毕竟当初搞得时候还是有点摸不着方向,写下来也方便以后查阅。
之前使用 Hbase 大多是把它当做实时数据库来做查询使用的,大部分使用的都是 Hbase 的基础 Api, Hbase 与 Hadoop Hive 框架的整合还真是没系统的搞过,话不多说,先看看本文的架构图:
PS:文中提到的代码见最后 参考资料
着重点在前两部分,后面的都是大家比较熟悉的部分了。
1 Hbase 与 Hadoop 集成
Hbase 与 Hadoop 相关操作主要可以分为如下三种情况:
-
一张 hbase 表数据导入另一张 hbase 表
-
HDFS 数据导入 Hbase 表
-
HDFS 数据(超大数据)导入 Hbase 表
以上三种情况的数据迁移基本都是依靠 MR 程序来完成的,所以重点又回到了 MR 编程。
01 hbase表数据导入
思路:准备 MR 程序将一张 Hbase 表写入到另一张 Hbase 表即可。
注意:两张 Hbase 表导入数据的列族信息要一致;有数据的 Hbase 在读入数据时要注意非空判断。
准备工作:
准备 user1 表 列族 为 f1,f1 中有 age ,name属性 ,作为输入表;
准备 user2 表,创建列族 f1,作为输出表。
主要代码:
Mapper 端:这里注意继承的 是 TableMapper
public class HBaseReadMapper extends TableMapper<Text,Put> { /** * * @param key rowkey * @param value rowkey 此行的数据 Result 类型 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //获得rowkey 的字节数组 byte[] rowkey_bytes = key.get(); String rowKeyStr = Bytes.toString(rowkey_bytes); //准备好 put 对象 用于输出下游 Put put = new Put(rowkey_bytes); //text 作为输出的 key Text text = new Text(rowKeyStr); //输出数据 - 写数据 - 普通 构建put 对象 Cell[] cells = value.rawCells(); //将 f1 : name & age 输出 for (Cell cell : cells) { //当前 cell是否是 f1 //获取列族 byte[] family = CellUtil.cloneFamily(cell); String familyStr = Bytes.toString(family); if("f1".equals(familyStr)){ //在判断是否是 name | age put.add(cell); } if("f2".equals(familyStr)){ put.add(cell); } } //注意非空判断 不然会报错 if(!put.isEmpty()){ context.write(text,put); } } }
Reduce 端 ,使用 TableReducer:
public class HbaseWriteReducer extends TableReducer<Text,Put,ImmutableBytesWritable> { /** * 将 map 传过来的数据写出去 * @param key * @param values * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //设置rowkey ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(); //设置rowkey immutableBytesWritable.set(key.toString().getBytes()); for (Put value : values) { context.write(immutableBytesWritable,value); } } }
启动类,将 user1 中 f1 列族下 age,name数值写入到 user2 中:
public class Hbase2HbaseMR extends Configured implements Tool { public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); //设置 hbase 的zk地址 configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181"); int run = ToolRunner.run(configuration, new Hbase2HbaseMR(), args); System.exit(run); } @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf()); job.setJarByClass(Hbase2HbaseMR.class); //mapper TableMapReduceUtil.initTableMapperJob(TableName.valueOf("user"),new Scan(), HBaseReadMapper.class,Text.class,Put.class,job); //reducer TableMapReduceUtil.initTableReducerJob("user2",HbaseWriteReducer.class,job); boolean b = job.waitForCompletion(true); return b?0:1; } }
02 HDFS 导入到Hbase
思路:准备 MR 程序将 HDFS 数据写入到另一张 Hbase 表即可。
注意:
读入的是 Mapper 是 HDFS 操作,写出的 Reduce 是 Hbase 操作;
HDFS 数据格式要与 Hbase 表对应
准备工作:
准备 HDFS 上数据 ;
准备 user2 表,创建列族 f1,作为输出表。
主要代码:
Mapper 端,使用常规 Mapper
public class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable>{ /** * HDFS -- Hbase * * @param key * @param value * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //数据原样输出 context.write(value,NullWritable.get()); } }
Reduce 端,使用 TableReducer :
public static class HBASEReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { /** * key --> 一行数据 * 样例数据: * 07 zhangsan 18 * 08 lisi 25 * 09 wangwu 20 * */ //按格式拆分 String[] split = key.toString().split("\t"); //构建 put 对象 Put put = new Put(Bytes.toBytes(split[0])); put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes()); put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes()); context.write(new ImmutableBytesWritable(split[0].getBytes()),put); } }
启动类:
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = HBaseConfiguration.create(); //设置 hbase zk 地址 configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181"); Job job = Job.getInstance(configuration); job.setJarByClass(Hdfs2HbaseMR.class); //输入文件路径 FileInputFormat.addInputPath(job,new Path("hdfs://hadoop102:9000/hbase/input")); job.setMapperClass(HdfsMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //指定输出到 Hbase 的 表名 TableMapReduceUtil.initTableReducerJob("user2",HBASEReducer.class,job); //设置 reduce 个数 job.setNumReduceTasks(1); boolean b = job.waitForCompletion(true); System.exit(b?0:1); }
03 HDFS 大数据导入Hbase
思路:与 2 中的数据导入不同的是这次的数据量比较大,使用常规的 MR 可能耗时非常的长,并且一直占用资源。
我们可以先将 Hadoop 上存储的 HDFS 文件转换成 HFile 文件,HFile 文件就是 Hbase 底层存储的类型,转换完成后,再将转换好的 HFile 文件指定给对应的 Hbase 表即可。这就是 bulkload 的方式批量加载数据,大致流程如下:
注意:
由于是文件类型转换,不做计算操作,所以只需要读入的 Mapper 操作,,不需要Reduce操作;
文件类型转换后 还需要做 Hbase 表与 HFile 文件的映射
准备工作:
准备 HDFS 上数据 ;
准备 user2 表,创建列族 f1,作为输出表。
主要代码:
Mapper 端,使用常规 Mapper
public class Hdfs2HFileMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); //封装输出类型 Put put = new Put(split[0].getBytes()); put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes()); put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes()); // 将封装好的put对象输出,rowkey 使用 immutableBytesWritable context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put); } }
启动类:
/** * * 将HDFS文件写成Hfile格式输出 */ public class Hdfs2HileOut extends Configured implements Tool { public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181"); int run = ToolRunner.run(configuration, new Hdfs2HileOut(), args); System.exit(run); } @Override public int run(String[] strings) throws Exception { Configuration conf = super.getConf(); Job job = Job.getInstance(conf); job.setJarByClass(Hdfs2HileOut.class); FileInputFormat.addInputPath(job,new Path("hdfs://hadoop102:9000/hbase/input")); job.setMapperClass(Hdfs2HFileMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("user2")); //使MR可以向user2表中,增量增加数据 HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("user2"))); //数据写回到HDFS 写成HFILE -》 所以指定输出格式为Hfile job.setOutputFormatClass(HFileOutputFormat2.class); //HFile 输出的路径,用于与表映射的输入参数 HFileOutputFormat2.setOutputPath(job,new Path("hdfs://hadoop102:9000/hbase/out_hfile2")); //开始执行 boolean b = job.waitForCompletion(true); return b? 0: 1; } }
加载类:
public class LoadHFile2Hbase { public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181,hadoop104:2181"); //获取数据库连接 Connection connection = ConnectionFactory.createConnection(configuration); Table table = connection.getTable(TableName.valueOf("user2")); //构建 LoadIncrementalHfiles 加载 Hfile文件 LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(configuration); // 加载上一步输出的HFile 与表做映射 loadIncrementalHFiles.doBulkLoad(new Path("hdfs://hadoop102:9000/hbase/out_hfile2"),connection.getAdmin(),table,connection.getRegionLocator(TableName.valueOf("user2"))); } }
至此,HDFS 数据迁移至 Hbase 完成。
2 Hbase 与 Hive 集成
hbase 与 hive 相关的数据迁移工作分为两种:
-
hive 表结果 ---> hbase 表
-
hbase 表数据 ---> hive 表
这部分操作没有代码,在 hive 和 hbase 客户端就能完成操作
01 准备工作
1 首先需要将 Hbase下的5个包拷贝到 hive lib 下,建议使用软连接的形式:
ln -s /home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-client-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-client-1.2.0-cdh5.14.2.jar ln -s /home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-hadoop2-compat-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-hadoop2-compat-1.2.0-cdh5.14.2.jar ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-hadoop-compat-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-hadoop-compat-1.2.0-cdh5.14.2.jar ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-it-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-it-1.2.0-cdh5.14.2.jar ln -s home/hadoop/module/hbase-1.2.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar /home/hadoop/module/hive-1.1.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar
2 修改 Hive 的配置文件 hive-site.xml 添加自己的 zk 信息:
<property> <name>hive.zookeeper.quorum</name> <value>hadoop102,hadoop103,hadoop104</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>hadoop102,hadoop103,hadoop104</value> </property>
3 修改 Hive 的配置文件 hive-env.sh 添加如下信息:
export HADOOP_HOME=/kkb/install/hadoop-2.6.0-cdh5.14.2/ export HBASE_HOME=/kkb/install/servers/hbase-1.2.0-cdh5.14.2 export HIVE_CONF_DIR=/kkb/install/hive-1.1.0-cdh5.14.2/conf
至此 准备工作完成。
02 hive表导入hbase
hive 中创建管理表(内部表)与hbase 表完成映射则hive管理表的数据会添加到 hbase 表中 ,命令如下:
create table course.hbase_score(id int,cname string,score int) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties("hbase.columns.mapping" = "cf:name,cf:score") tblproperties("hbase.table.name" = "hbase_score");
从命令中可以看出 hbase.table.name 是指的 hbase 表名,hbase.columns.mapping 则值的对应列族下的字段,而 hive 表的 id 则会作为hbase表的 rowkey 进行存储。
通过向内部表插入数据即可完成数据查询结果的导入。
insert overwrite table course.hbase_score select id,cname,score from course.score;
最后查看 hbase 表即可看到数据。
03 hbase表导入hive
hbase 结果映射到 hive表比较简单,创建 hive 外部表即可:
CREATE external TABLE hbase2hive(id int, name string, score int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:name,cf:score") TBLPROPERTIES("hbase.table.name" ="hbase_hive_score");
从命令中可以看出 hbase.table.name 是指的 hbase 表名,hbase.columns.mapping 的值则对应hive表的字段,而 hive 表的 id 则会作取 hbase表的 rowkey 进行存储。
至此,Hbase 与 Hive 的数据迁移就完成了。
3 Hbase 协处理器和基础 api
关于基础api这部分比较详细的介绍就在代码中了,再此我们就简单说一下Hbase 协处理器。
协处理器是为了解决Hbase早期版本的一些问题,如建立二次索引、复杂过滤器、求和计数分组计数等类sql操作以及访问控制等。
Hbase 提供两类协处理器:
-
observer 类似数据库的触发器,个人理解类似拦截器的功能;
-
endpoint 类似数据库的存储过程,可以实现类sql的统计操作。
协处理器的加载方式
01 静态加载实现
通过修改 hbase-site.xml 这个文件来实现, 如启动全局 aggregation,能过操纵所有的表数据。只需要在hbase-site.xml里面添加以下配置即可,修改完配置之后需要重启HBase集群。
<property> <name>hbase.coprocessor.user.region.classes</name> <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> </property>
为所有table加载了一个 cp class,可以用” ,”分割加载多个 class。
02 动态加载实现
启用表aggregation,只对特定的表生效。
下面以协处理器 observer 为例来简单说下操作过程:
1 创建 两张 hbase 表,user1 ,user2:
create 'user1','info; create 'user2','info';
2 协处理器代码开发,完成往 user1 表插入数据时,先往 user2 表插入数据,代码如下:
public class MyProcessor extends BaseRegionObserver { @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { //获取连接 Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","hadoop102:2181,hadoop103:2181:hadoop104:2181"); Connection connection = ConnectionFactory.createConnection(configuration); //涉及多个版本得问题 List<Cell> cells = put.get("info".getBytes(), "name".getBytes()); //将user1表的name 数据也插入到 user2 中 Cell nameCell = cells.get(0); Put put1 = new Put(put.getRow()); put1.add(nameCell); Table table = connection.getTable(TableName.valueOf("user2")); table.put(put1); table.close(); connection.close(); } }
3 将开发好的项目打包上传到 HDFS ,路径自定,假设是:
hdfs://hadoop102:9000/processor/processor.jar
4 将 jar 包挂载到 user1 表:
disable 'user1'; alter 'user1',METHOD => 'table_att','Coprocessor'=>'hdfs://hadoop102:9000/processor/processor.jar|com.bigdata.comprocessor.MyProcessor|1001|'; enabled 'user1';
com.bigdata.comprocessor.MyProcessor : 你程序的全类名;
1001 :协处理器编号,自定义即可,表中协处理器的编号不能重复。
5 测试向 user1 中插入数据,user2 是否有数据:
public class TestObserver { @Test public void testPut() throws IOException { //获取连接 Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "hadoop102:2181,hadoop103:2181,hadoop104:2181"); //创建连接对象 Connection connection = ConnectionFactory.createConnection(configuration); Table proc1 = connection.getTable(TableName.valueOf("user1")); Put put = new Put("1110001112".getBytes()); put.addColumn("info".getBytes(),"name".getBytes(),"hello".getBytes()); put.addColumn("info".getBytes(),"gender".getBytes(),"male".getBytes()); put.addColumn("info".getBytes(),"nationality".getBytes(),"test".getBytes()); proc1.put(put); proc1.close(); connection.close(); System.out.println("success"); } }
关于协处理器卸载:
disable 'user1' alter 'user1',METHOD=>'table_att_unset',NAME=>'coprocessor$1' enable 'user1'
协处理器 observer 大致开发流程就是这样的。关于基础 api 放在参考资料的项目中了。
至此,还留有一个问题就是 hbase 的 endpoint 协处理器,其实它解决的问题及时实现 min、 max、 avg、 sum、 distinct、 group by 等sql功能,这个问题我们放在下期,下期介绍一个基于 hbase 框架之上的框架 -- phoenix,Phoenix之于 Hbase ,就像 hive 之于 Hadoop,会完美的实现 hbase 的 sql 查询操作。
项目代码地址: https://github.com/fanpengyi/hbase-api
-- THE END --
- EMR(hadoop/hbase/phoenix)夸集群数据迁移采坑记录
- [cloudera hadoop]用sqoop将数据从mysql迁移到hbase中出现的错误
- 从零自学Hadoop(21):HBase数据模型相关操作下
- spark插入数据到hbase: org.apache.hadoop.conf.Configuration not Serializable Exeception
- hadoop 1.1.2和 hive 0.10 和hbase 0.94.9整合
- 大数据组件原理总结-Hadoop、Hbase、Kafka、Zookeeper、Spark
- [大数据]Hadoop 2.7.3 和Hbase 1.2.4安装教程
- Hadoop,Hbase,Zookeeper在虚拟机单节点中的整合
- hadoop2.7.2集群hive-1.2.1整合hbase-1.2.1
- 大数据Hadoop核心架构HDFS+MapReduce+Hbase+Hive内部机理详解
- Hadoop Hbase适合存储哪类数据?
- hbase安装配置(整合到hadoop)
- Hadoop+hbase+zookeeper整合
- hadoop生态系统学习之路(八)hbase与hive的数据同步以及hive与impala的数据同步
- Hbase数据迁移方案
- 王家林最受欢迎的一站式云计算大数据和移动互联网解决方案课程 V1(20140809)之Hadoop企业级完整训练:Rocky的16堂课(HDFS&MapReduce&HBase&Hive&Zookee
- hbase 数据export/import (No enum constant org.apache.hadoop.mapreduce.JobCounter.MB_MILLIS_MAPS)
- Hbase图片文件数据迁移到FastDFS
- 琐碎-hadoop2.2.0-hbase0.96.0-hive0.13.1整合
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Flume ZooKeeper Storm Kafka Redis MongoDB Scala Spark 机器学习 Docker 云计算