您的位置:首页 > 其它

从hbase表1中读取数据,最终结果写入到hbase表2 ,如何通过MapReduce实现 ?

2017-09-19 00:39 627 查看
需要一:

将hbase中‘student’表中的info:name和info:age两列数据取出并写入到hbase中‘user’表中的basic:XM和basic:NL

class ReadStudentMapper extends TableMapper

package hbaseapi.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Student2UserMapReduce extends Configured implements Tool {

// Step 1 : Mapper
public static class ReadStudentMapper extends
TableMapper<ImmutableBytesWritable, Put> {

@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
// create put ,
Put put = new Put(key.get());
// add cell/data to put
for (Cell cell : value.rawCells()) {
// get info family
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
// add name to put
if ("name".equals(Bytes.toString(CellUtil
.cloneQualifier(cell)))) {
put.add(Bytes.toBytes("info"), Bytes.toBytes("XM"),
CellUtil.cloneValue(cell));
} else if ("age".equals(Bytes.toString(CellUtil
.cloneQualifier(cell)))) {
put.add(Bytes.toBytes("info"), Bytes.toBytes("NL"),
CellUtil.cloneValue(cell));

}

}

}

// context output
context.write(key, put);

}
}

// Step 2 : Reducer

public static class WriteUserReducer extends
TableReducer<ImmutableBytesWritable, Put, NullWritable> {

@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
Context context) throws IOException, InterruptedException {

for (Put put : values) {
context.write(NullWritable.get(), put);
}

}

}

// Step 3 : Driver
public int run(String[] args) throws Exception {
// 1) Configuration
Configuration conf = this.getConf();
// 2) create job
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Student2UserMapReduce.class);

// 3) set job
// set scan 设置一个查询范围或条件
Scan scan = new Scan();
// 设置只扫描某些列或列簇
scan.addFamily(Bytes.toBytes("info"));
// set Mapper
TableMapReduceUtil.initTableMapperJob("student",
scan,
ReadStudentMapper.class,
ImmutableBytesWritable.class,
Put.class,
job);
// set Reducer
TableMapReduceUtil.initTableReducerJob(
"user",
WriteUserReducer.class,
job);

//set reduce nums
job.setNumReduceTasks(1); //at least one ,adjust as required!!

boolean isSuccess = job.waitForCompletion(true);
if (!isSuccess) {
throw new IOException("error with job!");
}
return isSuccess ? 0 : 1;

}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(//
conf, //
new Student2UserMapReduce(), //
args //
);
System.exit(status);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hbase
相关文章推荐