实现一个工具类,可以把HBase任意表的任意多的列导出到任意指定的HDFS中
2015-09-23 19:39
363 查看
package hbase; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class HBase2HdfsUtils { /** * args[0] 表名 * args[1] 列族、列名称列表,格式---列族:列 * args[2] 输出路径 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //获取Hbase的配置信息,从resources目录下的hbase-site.xml文件中获取配置信息 Configuration conf = HBaseConfiguration.create(); //设置列族、列名称信息列表参数,格式--列族:列 conf.set("FamilyColumnsList", args[1]); //申明一个客户端 Job job = Job.getInstance(conf, HBase2HdfsUtils.class.getSimpleName()); //打成jar包执行需要指定类名 job.setJarByClass(HBase2HdfsUtils.class); //指定HBase中需要导出表的信息,即map的输入 Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(args[0], scan, MyMapper.class, Text.class, Text.class, job); //设置输入的配置信息:key、value的类型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //从hbase导出数据到hdfs不需要reduce,所以设置reduce的任务数为0 job.setNumReduceTasks(0); //设置输出的配置信息:key、value的类型以及输出路径 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); //如果输出目录存在,则删除输出目录 Path path = new Path(args[2]); FileSystem fs = FileSystem.get(new URI(args[2]), new Configuration()); if(fs.exists(path)){ fs.delete(path, true); } FileOutputFormat.setOutputPath(job, new Path(args[2])); job.waitForCompletion(true); } static class MyMapper extends TableMapper<Text, Text>{ Text k2 = new Text(); Text v2 = new Text(); @Override protected void map( ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException { k2.set(""); String v2Text = ""; String familyColumnsList = context.getConfiguration().get("FamilyColumnsList"); String[] splited = familyColumnsList.split(","); String title = ""; //标题 for (String split : splited) { String[] column = split.split(":"); //根据列族、列获取值 Cell cell = value.getColumnLatestCell(column[0].getBytes(), column[1].getBytes()); //判断据列族、列获取到的cell不为空,否则会报空指针错误 if(cell!=null){ title += new String(CellUtil.cloneQualifier(cell)) + ":" + new String(CellUtil.cloneQualifier(cell)) + "\t" ; v2Text += new String(CellUtil.cloneValue(cell)) + "\t" ; } } v2.set(title + "\n" + v2Text); context.write(k2, v2); } } }
相关文章推荐
- HDFS的基本概念(一)
- hdfs 上面block有异常处理流程
- Cloudera的CDH和Apache的Hadoop的区别
- HDFS文件写入
- tachyon与hdfs,以及spark整合
- HDFs数据读取过程
- hdfs文件读写bug2
- 关于 HDFS 的 file size 和 block size
- hdfs rack机架感知配置
- HDFS的快照原理和Hbase基于快照的表修复
- Hadoop 2.2.0编译安装
- HDFS读写文件BUG
- ubuntu从头开始搭建hadoop伪分布式环境
- hadoop Secondary NameNode作用
- HDFS 官方文档 中文
- hadoop完全分布式搭建
- Debug HDFS (远程调试HDFS)
- HDFS浅析
- HDFS浅析
- hdfs下载文件到本地