HBase与MapReduce集成2-Hdfs2HBase
2015-08-05 09:29
204 查看
2)File中解析数据到HBase表中(import)
Hdfs2HBase
文件格式的数据->HBase表中
Mapreduce
* input: hdfs files
Mapper:OutputKey/OutputValue
* output: hbase table
Hdfs2HBase
文件格式的数据->HBase表中
Mapreduce
* input: hdfs files
Mapper:OutputKey/OutputValue
* output: hbase table
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* 需求: File中解析数据到HBase表中(import) Mapreduce * input: hdfs files Mapper:OutputKey/OutputValue * output: hbase table */ public class HDFS2UserMapReduce extends Configured implements Tool { // Mapper Class // Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> public static class ReadHDFSMapper extends Mapper<LongWritable, Text,Text,Put> { private Text mapOutputKey = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String values[] =lineValue.split("\t"); Put put = new Put(Bytes.toBytes(values[0])); for(int i=0;i<values.length;i++) { if(i==0) { mapOutputKey.set(values[0]); continue; } String column =values[i]; String cf =column.substring(0, column.indexOf(':')); String cq =column.substring(column.indexOf(':')+1, column.indexOf('=')); String val =column.substring(column.indexOf('=')+1, column.length()); put.add(Bytes.toBytes(cf), Bytes.toBytes(cq), Bytes.toBytes(val)); } context.write(mapOutputKey, put); } } // Reducer Class public static class WriteBasicReducer extends TableReducer<Text, Put, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for(Put put : values){ context.write(null, put); } } } // Driver public int run(String[] args) throws Exception { if (args.length < 2) { usage("Wrong number of arguments: " + args.length); System.exit(-1); } // create job Job job = Job.getInstance(this.getConf(),// this.getClass().getSimpleName()); // set run job class job.setJarByClass(this.getClass()); job.setMapperClass(ReadHDFSMapper.class); //input format job.setInputFormatClass(TextInputFormat.class); Path inputPath = new Path(args[1]); FileInputFormat.addInputPath(job, inputPath); TableMapReduceUtil.initTableReducerJob( args[0], // output table WriteBasicReducer.class, // reducer class job// ); job.setNumReduceTasks(0); // at least one, adjust as required boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } return 0; } /* * @param errorMsg Error message. Can be null. */ private static void usage(final String errorMsg) { if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } System.err.println("Usage: User2HDFSMapReduce <tablename> <inputputdir> "); System.err.println("Examples: user hdfs://172.27.35.8:8020/user/hadoop/export/user "); } public static void main(String[] args) throws Exception { // get configuration Configuration conf = HBaseConfiguration.create(); // submit job int status = ToolRunner.run(// conf, // new HDFS2UserMapReduce(), // args// ); // exit program System.exit(status); } }
相关文章推荐
- HBase与MapReduce集成1-HBase2Hdfs
- hdfs 客户端挂载,集群间复制
- hadoop 安装配置
- hadoop1存在的问题及hadoop2的优势对比
- HDFS的运行原理(转)
- HDFS 的Trash回收站功能的配置、使用
- hdfs api
- HDFS文件操作
- HDFS文件读取详解
- (转)HDFS简介
- hdfs小文件的解决方案
- flume实例二、监听目录日志上传到HDFS文件系统
- HDFS数据完整性解析
- Hadoop2.5.2+HA+zookeeper3.4.6详细配置过程
- HDFS Balancer(翻译)
- HDFS读写过程
- HDFS Trash 整理
- HDFS SnapShot学习
- HDFS RPC源码分析
- DEPRECATED: Use of this script to execute hdfs command is deprecated