您的位置:首页 > 其它

Hbase通过BulkLoad快速导入数据

2016-12-11 20:54 489 查看
HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。大量的数据导入到Hbase中时速度回很慢,不过我们可以使用bulkload来导入。

BulkLoad的过程主要有以下部分:

1. 从数据源提取数据并上传到HDFS中。

2. 使用MapReduce作业准备数据

这一步需要一个MapReduce作业,并且大多数情况下还需要我们自己编写Map函数,而Reduce函数不需要我们考虑,由HBase提供。该作业需要使用rowkey(行键)作为输出Key,KeyValue、Put或者Delete作为输出Value。MapReduce作业需要使用HFileOutputFormat2来生成HBase数据文件。为了有效的导入数据,需要配置HFileOutputFormat2使得每一个输出文件都在一个合适的区域中。为了达到这个目的,MapReduce作业会使用Hadoop的TotalOrderPartitioner类根据表的key值将输出分割开来。HFileOutputFormat2的方法configureIncrementalLoad()会自动的完成上面的工作。

3. 告诉RegionServers数据的位置并导入数据

这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导入到相应的区域。

代码如下:

package com.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class BulkLoadMR {

public static class BulkMap extends Mapper<Object, Text, ImmutableBytesWritable, Put>{
public static final String SP = "\t";
@Override
protected void map(Object key,Text value,Context context)
throws IOException, InterruptedException {
String[] values = value.toString().split(SP);
if(values.length == 2){
byte[] rowKey = Bytes.toBytes(values[0]);
byte[] columnValue = Bytes.toBytes(values[1]);
byte[] family = Bytes.toBytes("d");
byte[] columnName = Bytes.toBytes("c");

ImmutableBytesWritable rowKwyWritable = new ImmutableBytesWritable(rowKey);
Put put = new Put(rowKey);
put.add(family, columnName, columnValue);
context.write(rowKwyWritable, put);

}

}
}

public static void main(String[] args) throws Exception {
String dst = args[0];
String out = args[1];
int splitMB = Integer.parseInt(args[2]);
String tableName = args[3];

Configuration conf = new Configuration();
conf.set("mapreduce.input.fileinput.split.maxsize", String.valueOf(splitMB * 1024 *1024));
conf.set("mapred.min.split.size", String.valueOf(splitMB * 1024 *1024));
conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(splitMB * 1024 *1024));
conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(splitMB * 1024 *1024));

Job job = new Job(conf,"BulkLoad");
job.setJarByClass(BulkLoadMR.class);
job.setMapperClass(BulkMap.class);
job.setReducerClass(PutSortReducer.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

FileInputFormat.addInputPath(job, new Path(dst));
FileOutputFormat.setOutputPath(job, new Path(out));

Configuration hbaseconf = HBaseConfiguration.create();
HTable table = new HTable(hbaseconf, tableName);
HFileOutputFormat2.configureIncrementalLoad(job, table);

job.waitForCompletion(true);

//将生成的入库
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseconf);
loader.doBulkLoad(new Path(out), table);

}

}


注意:

map的输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息