您的位置:首页 > 编程语言 > PHP开发

Hbase使用MultiTableOutputFormat实现多表输出MapReduce job

2016-03-08 11:13 686 查看
参考:https://bigdatabuzz.wordpress.com/2012/04/24/how-to-write-to-multiple-hbase-tables-in-a-mapreduce-job/

我们经常遇到需要更新多个表从一个map中输出以减少程序的运行时间。一个简单的方法是使用一个multitableoutputformat。

要配置这个工作,我们需要设置outputformatclass到multitableoutputformat.class。

为了使输出格式类识别的表名,我们需要通过表名为context.write关键。

下面给出一个例子:

In Mapper :

public class Clean_TransMapper extends TableMapper<ImmutableBytesWritable, Put> {

private ImmutableBytesWritable tbl1 = new ImmutableBytesWritable (Bytes.toBytes("outPutTable"));
private ImmutableBytesWritable tbl2 = new ImmutableBytesWritable (Bytes.toBytes("outTempTable"));

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        Put put = new Put(key.get());

//具体的逻辑就不给你们看了,以下举例
String value = Bytes.toString(row.get() );
if ("ONE".equals(value)) {
context.write(tbl1, record);
} else {
context.write(tbl2, record);
}

}
}

In the Driver class,

public final class MultiTableWriteDriver  extends Configured  implements Tool {

.................
@Override
public int run (final String[] args) {

Configuration conf = HBaseConfiguration.create();
conf.set("mapreduce.job.jar","WeRroot\\WEB-INF\\lib\\hadoop-veiwhigh-0.0.1-SNAPSHOT.jar");
Job job = Job.getInstance(conf, "*******");
Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false); // don't set to true for MR jobs
        scan.setStartRow(Bytes.toBytes(setStartRow));
        scan.setStopRow(Bytes.toBytes(setStopRow));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration());
         job.setJarByClass(MrCleanTransJobTool.class); //
         TableMapReduceUtil.initTableMapperJob(inPutTable, // 输入表
                    scan, // Scan instance to control CF and attribute selection
                    *****.class, // mapper class
                    ImmutableBytesWritable.class, // mapper output key
                    Put.class, // mapper output value
                    job);
        job.setOutputFormatClass (MultiTableOutputFormat.class);
        job.setNumReduceTasks(0); //设置为0表示不需要reduce
        
        return job.waitForCompletion(true) ? 0 : -1;
}
}



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: