MapReduce的方式进行HBase向HDFS导入和导出
2016-06-14 10:28
435 查看
附录代码:
HBase---->HDFS
HDFS---->HBase 通过MR导入到HBase
HBase---->HDFS
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class HBase2HDFS { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); Job job = Job.getInstance(conf, HBase2HDFS.class.getSimpleName()); job.setJarByClass(HBase2HDFS.class); //MR有输入和输出,输入一般是FileInputFormat等...但是在HBase中需要用到一个特殊的工具类是TableMapReduceUtil TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), HBase2HDFSMapper.class, Text.class, Text.class, job); //HBase中的具体操作打到MR的job中. TableMapReduceUtil.addDependencyJars(job); job.setMapperClass(HBase2HDFSMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); //FileOutputFormat.setOutputPath(job, new Path("/t1-out")); job.setNumReduceTasks(0); job.waitForCompletion(true); } static class HBase2HDFSMapper extends TableMapper<Text, Text>{ private Text rowKeyText = new Text(); private Text value = new Text(); //这个TableMapper中的两个泛型是Map阶段的输出..HBase中的数据要想进入HBase,几乎都用引号引起来. //TableMapper是Mapper类的一个子类.这个类用来定义前面的两个泛型参数. @Override protected void map( ImmutableBytesWritable key, Result result, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException { //结果都在result对象,用raw方法从result对象中找到数据. 这个raw()方法已经过时了. /* KeyValue[] raw = result.raw(); for (KeyValue keyValue : raw) { keyValue.getValue(); } */ /* * 想输出的数据格式如下: 1 zhangsan 13 (行键,name,age) * 2 lisi 14 */ //要想精确的获得某一列的值,要根据行键,列族,列的时间戳. //getColumnLatestCell 是获得最新的时间戳的值 相当于时间戳已经定义好了. byte[] nameBytes = result.getColumnLatestCell("cf".getBytes(), "name".getBytes()).getValue(); byte[] ageBytes = result.getColumnLatestCell("cf".getBytes(), "age".getBytes()).getValue(); rowKeyText.set(key.get()); value.set(new String(nameBytes) + "\t" + new String(ageBytes)); context.write(new Text(key.get()), value); //这里已经把数据搞成了 1 name age 的形式....就不需要写Reduce } } }
HDFS---->HBase 通过MR导入到HBase
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class HDFS2HBaseImport { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(TableOutputFormat.OUTPUT_TABLE, args[0]); Job job = Job.getInstance(conf, HDFS2HBaseImport.class.getSimpleName()); job.setJarByClass(HDFS2HBaseImport.class); //数据到底放到哪一张表中,还是要用到TableMapReduceUtil类. TableMapReduceUtil.addDependencyJars(job); job.setMapperClass(HDFS2HBaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); job.setReducerClass(HDFS2HBaseReducer.class); job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, args[1]); job.waitForCompletion(true); } static class HDFS2HBaseMapper extends Mapper<LongWritable, Text, Text, Text>{ private Text rowKeyText = new Text(); private Text value = new Text(); @Override protected void map(LongWritable key, Text text, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] splits = text.toString().split("\t"); rowKeyText.set(splits[0]); value.set(splits[1] + "\t" + splits[2]);//name\tage context.write(rowKeyText, value); } } //Reduce继承的是和在导出的时候Map extends TableMapper 对应的 因为导入的是HBase中,所以后面的参数用NullWritable代替 static class HDFS2HBaseReducer extends TableReducer<Text, Text, NullWritable> { @Override protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, NullWritable, Mutation>.Context context) throws IOException, InterruptedException { //向HBase中插入数据一定要用到Put对象. Put put = new Put(k2.getBytes()); for (Text text : v2s) { String[] splits = text.toString().split("\t"); //加载列和对应的值 put.add("cf".getBytes(), "name".getBytes(), splits[0].getBytes()); put.add("cf".getBytes(), "age".getBytes(), splits[1].getBytes()); context.write(NullWritable.get(), put);//一个参数是key,一个是对应的value. //导入HBase不需要key...直接用NullWritable对象和封装好数据的put对象. } } } }
相关文章推荐
- spark 中删除HDFS文件
- 集群配置虚拟主机及部署Hadoop集群碰到的问题
- Spark之Streaming实时监听Hdfs文件目录
- 分布式文件系统:HDFS(5)
- HDFS基本原理及数据存取实战
- HDFS基本原理及数据存取实战
- HDFS基本原理及数据存取实战
- flume与hdfs集成
- HDFS上传文件
- hdfs.DFSClient: Exception in createBlockOutputStre
- Kafka+Storm+HDFS整合实践
- Zookeeper解决Hadoop单点问题
- FDFS和HDFS分布式文件系统测试
- HDFS内部的认证机制
- Kafka+Storm+HDFS整合实践
- scala实现单词统计(hdfs上)
- hadoop2.7.2的详细安装文档
- HDFS源码分析之FSImage文件内容(一)总体格式
- HDFS源码分析数据块校验之DataBlockScanner
- HDFS源码分析数据块复制选取复制源节点