您的位置:首页 > 大数据 > Hadoop

HBase与MapReduce集成2-Hdfs2HBase

2015-08-05 09:29 204 查看
2)File中解析数据到HBase表中(import)

Hdfs2HBase

文件格式的数据->HBase表中

Mapreduce

* input: hdfs files

Mapper:OutputKey/OutputValue

* output: hbase table

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
需求:
File中解析数据到HBase表中(import)
Mapreduce
* input: hdfs files
Mapper:OutputKey/OutputValue
* output: hbase table
*/
public class HDFS2UserMapReduce extends Configured implements Tool {

// Mapper Class
// Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>
public static class ReadHDFSMapper extends Mapper<LongWritable, Text,Text,Put> {
private Text mapOutputKey = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue = value.toString();
String values[] =lineValue.split("\t");
Put put = new Put(Bytes.toBytes(values[0]));
for(int i=0;i<values.length;i++)
{
if(i==0)
{
mapOutputKey.set(values[0]);
continue;
}
String column =values[i];
String cf =column.substring(0, column.indexOf(':'));
String cq =column.substring(column.indexOf(':')+1, column.indexOf('='));
String val =column.substring(column.indexOf('=')+1, column.length());
put.add(Bytes.toBytes(cf),
Bytes.toBytes(cq),
Bytes.toBytes(val));
}
context.write(mapOutputKey, put);
}

}
// Reducer Class
public static class WriteBasicReducer extends
TableReducer<Text, Put, ImmutableBytesWritable> {

@Override
protected void reduce(Text key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
for(Put put : values){
context.write(null, put);
}
}

}

// Driver
public int run(String[] args) throws Exception {
if (args.length < 2) {
usage("Wrong number of arguments: " + args.length);
System.exit(-1);
}

// create job
Job job = Job.getInstance(this.getConf(),//
this.getClass().getSimpleName());

// set run job class
job.setJarByClass(this.getClass());

job.setMapperClass(ReadHDFSMapper.class);

//input format
job.setInputFormatClass(TextInputFormat.class);
Path inputPath = new Path(args[1]);
FileInputFormat.addInputPath(job, inputPath);

TableMapReduceUtil.initTableReducerJob(
args[0],        // output table
WriteBasicReducer.class,    // reducer class
job//
);
job.setNumReduceTasks(0);   // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
return 0;
}

/*
* @param errorMsg Error message.  Can be null.
*/
private static void usage(final String errorMsg) {
if (errorMsg != null && errorMsg.length() > 0) {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: User2HDFSMapReduce <tablename> <inputputdir> ");
System.err.println("Examples:  user hdfs://172.27.35.8:8020/user/hadoop/export/user ");

}

public static void main(String[] args) throws Exception {
// get configuration
Configuration conf = HBaseConfiguration.create();

// submit job
int status = ToolRunner.run(//
conf, //
new HDFS2UserMapReduce(), //
args//
);

// exit program
System.exit(status);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: