对于HBase的MapReduce性能提升方案之BulkLoad
2014-10-16 18:14
239 查看
我们知道,在第一次海量数据批量入库时,我们会选择使用BulkLoad的方式。
简介一下BulkLoad原理方式:(1)通过MapReduce的方式,在Map或者Reduce端将输出格式化为HBase的底层存储文件HFile。(2)调用BulkLoad将第一个Job生成的HFile导入到对应的HBase表中。
ps:请注意(1)HFile方式是所有的加载方案里面是最快的,前提是:数据必须第一个导入,表示空的!如果表中已经有数据,HFile再次导入的时候,HBase的表会触发split分割操作。(2)最终输出结果,无论是Map还是Reduce,输出建议只使用<ImmutableBytesWritable, KeyValue>。
现在我们开始正题:BulkLoad固然是写入HBase最快的方式,但是,如果我们在做业务分析的时候,而数据又已经在HBase的时候,我们采用普通的针对HBase的方式,如下demo所示:
import com.yeepay.bigdata.bulkload.TableCreator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
import java.io.IOException;
public class HBaseMapReduceDemo {
static Logger LOG = Logger.getLogger(HBaseMapReduceDemo.class);
static class Mapper1 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
try {
// context.write(key, value);
} catch (Exception e) {
LOG.error(e);
}
}
}
public static class Reducer1 extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
try {
Put put = new Put(key.get());
// put.add();
context.write(key, put);
} catch (Exception e) {
LOG.error(e);
return ;
} // catch
} // reduce function
} // reduce class
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
conf.set("hbase.zookeeper.quorum", "yp-name02,yp-name01,yp-data01");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// conf.set(TableInputFormat.INPUT_TABLE,"access_logs");
Job job = new Job(conf, "HBaseMapReduceDemo");
job.setJarByClass(HBaseMapReduceDemo.class);
// job.setNumReduceTasks(2);
Scan scan = new Scan();
scan.setCaching(2500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob("srcHBaseTableName", scan, Mapper1.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
// TableCreator.createTable(20, true, "OP_SUM");
TableMapReduceUtil.initTableReducerJob("destHBasetableName", Reducer1.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
这个时候在对海量数据的插入过程中,会放生Spliter,写入速度非常的,及其的慢。但是此种情况适合,对已有的HBase表进行修改时候的使用。
针对如下情况HBase -> MapReduce 分析 -> 新表,我们采用 (HBase -> MapReduce 分析 -> bulkload -> 新表)方式。
demo如下:
Mapper如下:public class MyReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
static Logger LOG = Logger.getLogger(MyReducer.class);
public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
try {
context.write(key, kv);
} catch (Exception e) {
LOG.error(e);
return;
} // catch
} // reduce function
}
Reducer如下:
public class MyReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
static Logger LOG = Logger.getLogger(MyReducer.class);
public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
try {
context.write(key, kv);
} catch (Exception e) {
LOG.error(e);
return;
} // catch
} // reduce function
}
Job and BulkLoad:
public abstract class JobBulkLoad {
public void run(String[] args) throws Exception {
try {
if (args.length < 1) {
System.err.println("please set input dir");
System.exit(-1);
return;
}
String srcTableName = args[0];
String destTableName = args[1];
TableCreator.createTable(20, true, destTableName);
// 设置 HBase 参数
HBaseConfiguration conf = new HBaseConfiguration();
conf.set("hbase.zookeeper.quorum", "yp-name02,yp-name01,yp-data01");
// conf.set("hbase.zookeeper.quorum", "nn01, nn02, dn01");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// 设置 Job 参数
Job job = new Job(conf, "hbase2hbase-bulkload");
job.setJarByClass(JobBulkLoad.class);
HTable htable = new HTable(conf, destTableName); // 根据region的数量来决定reduce的数量以及每个reduce覆盖的rowkey范围
// ----------------------------------------------------------------------------------------
Scan scan = new Scan();
scan.setCaching(2500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(srcTableName, scan, MyMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
// TableMapReduceUtil.initTableReducerJob(destTableName, Common_Reducer.class, job);
job.setReducerClass(MyReducer.class);
Date now = new Date();
Path output = new Path("/output/" + destTableName + "/" + now.getTime());
System.out.println("/output/" + destTableName + "/" + now.getTime());
HFileOutputFormat.configureIncrementalLoad(job, htable);
FileOutputFormat.setOutputPath(job, output);
HFileOutputFormat.configureIncrementalLoad(job, htable);
job.waitForCompletion(true);
//----- 执行BulkLoad -------------------------------------------------------------------------------
HdfsUtil.chmod(conf, output.toString());
HdfsUtil.chmod(conf, output + "/" + YeepayConstant.COMMON_FAMILY);
htable = new HTable(conf, destTableName);
new LoadIncrementalHFiles(conf).doBulkLoad(output, htable);
System.out.println("HFile data load success!");
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
}
简介一下BulkLoad原理方式:(1)通过MapReduce的方式,在Map或者Reduce端将输出格式化为HBase的底层存储文件HFile。(2)调用BulkLoad将第一个Job生成的HFile导入到对应的HBase表中。
ps:请注意(1)HFile方式是所有的加载方案里面是最快的,前提是:数据必须第一个导入,表示空的!如果表中已经有数据,HFile再次导入的时候,HBase的表会触发split分割操作。(2)最终输出结果,无论是Map还是Reduce,输出建议只使用<ImmutableBytesWritable, KeyValue>。
现在我们开始正题:BulkLoad固然是写入HBase最快的方式,但是,如果我们在做业务分析的时候,而数据又已经在HBase的时候,我们采用普通的针对HBase的方式,如下demo所示:
import com.yeepay.bigdata.bulkload.TableCreator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
import java.io.IOException;
public class HBaseMapReduceDemo {
static Logger LOG = Logger.getLogger(HBaseMapReduceDemo.class);
static class Mapper1 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
try {
// context.write(key, value);
} catch (Exception e) {
LOG.error(e);
}
}
}
public static class Reducer1 extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
try {
Put put = new Put(key.get());
// put.add();
context.write(key, put);
} catch (Exception e) {
LOG.error(e);
return ;
} // catch
} // reduce function
} // reduce class
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
conf.set("hbase.zookeeper.quorum", "yp-name02,yp-name01,yp-data01");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// conf.set(TableInputFormat.INPUT_TABLE,"access_logs");
Job job = new Job(conf, "HBaseMapReduceDemo");
job.setJarByClass(HBaseMapReduceDemo.class);
// job.setNumReduceTasks(2);
Scan scan = new Scan();
scan.setCaching(2500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob("srcHBaseTableName", scan, Mapper1.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
// TableCreator.createTable(20, true, "OP_SUM");
TableMapReduceUtil.initTableReducerJob("destHBasetableName", Reducer1.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
这个时候在对海量数据的插入过程中,会放生Spliter,写入速度非常的,及其的慢。但是此种情况适合,对已有的HBase表进行修改时候的使用。
针对如下情况HBase -> MapReduce 分析 -> 新表,我们采用 (HBase -> MapReduce 分析 -> bulkload -> 新表)方式。
demo如下:
Mapper如下:public class MyReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
static Logger LOG = Logger.getLogger(MyReducer.class);
public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
try {
context.write(key, kv);
} catch (Exception e) {
LOG.error(e);
return;
} // catch
} // reduce function
}
Reducer如下:
public class MyReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
static Logger LOG = Logger.getLogger(MyReducer.class);
public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
try {
context.write(key, kv);
} catch (Exception e) {
LOG.error(e);
return;
} // catch
} // reduce function
}
Job and BulkLoad:
public abstract class JobBulkLoad {
public void run(String[] args) throws Exception {
try {
if (args.length < 1) {
System.err.println("please set input dir");
System.exit(-1);
return;
}
String srcTableName = args[0];
String destTableName = args[1];
TableCreator.createTable(20, true, destTableName);
// 设置 HBase 参数
HBaseConfiguration conf = new HBaseConfiguration();
conf.set("hbase.zookeeper.quorum", "yp-name02,yp-name01,yp-data01");
// conf.set("hbase.zookeeper.quorum", "nn01, nn02, dn01");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// 设置 Job 参数
Job job = new Job(conf, "hbase2hbase-bulkload");
job.setJarByClass(JobBulkLoad.class);
HTable htable = new HTable(conf, destTableName); // 根据region的数量来决定reduce的数量以及每个reduce覆盖的rowkey范围
// ----------------------------------------------------------------------------------------
Scan scan = new Scan();
scan.setCaching(2500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(srcTableName, scan, MyMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
// TableMapReduceUtil.initTableReducerJob(destTableName, Common_Reducer.class, job);
job.setReducerClass(MyReducer.class);
Date now = new Date();
Path output = new Path("/output/" + destTableName + "/" + now.getTime());
System.out.println("/output/" + destTableName + "/" + now.getTime());
HFileOutputFormat.configureIncrementalLoad(job, htable);
FileOutputFormat.setOutputPath(job, output);
HFileOutputFormat.configureIncrementalLoad(job, htable);
job.waitForCompletion(true);
//----- 执行BulkLoad -------------------------------------------------------------------------------
HdfsUtil.chmod(conf, output.toString());
HdfsUtil.chmod(conf, output + "/" + YeepayConstant.COMMON_FAMILY);
htable = new HTable(conf, destTableName);
new LoadIncrementalHFiles(conf).doBulkLoad(output, htable);
System.out.println("HFile data load success!");
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
}
相关文章推荐
- 对于HBase的MapReduce性能提升方案之BulkLoad
- MapReduce生成HFile文件,再使用BulkLoad导入HBase中(完全分布式运行)
- mapreduce生成HFile通过bulkload入hbase库问题
- MapReduce生成HFile文件,再使用BulkLoad导入HBase中(完全分布式运行)
- 基于MapReduce,使用bulkload方式像hbase导入数据
- HBase导入大数据三大方式之(三)——mapreduce+completebulkload 方式
- 使用Jquery.lazyload框架提升网站页面加载性能。
- HBase 写优化之 BulkLoad 实现数据快速入库
- Hbasebulkload方法--63
- HBase数据迁移(2)- 使用bulk load 工具从TSV文件中导入数据
- Hbase通过BulkLoad的方式快速导入海量数据
- Bulk Load-HBase数据导入最佳实践
- hbase很有价值的读写性能提升
- 在Spark上通过BulkLoad快速将海量数据导入Hbase
- Hbase框架原理及相关的知识点理解、Hbase访问MapReduce、Hbase访问Java API、Hbase shell及Hbase性能优化总结
- bulk-load装载hdfs数据到hbase
- HBase快速导入数据--BulkLoad
- HBase数据快速导入之ImportTsv&Bulkload
- hbase bulkload导入数据
- 【hbase】——HBase 写优化之 BulkLoad 实现数据快速入库