您的位置:首页 > 编程语言 > PHP开发

hbase bulk load相关源码简析之HFileOutputFormat、LoadIncrementalHFiles

2013-11-28 17:24 901 查看
转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/17002523

hbase bulkload主要涉及的类有这么几个:

HFileOutputFormat

LoadIncrementalHFiles

PutSortReducer

KeyValueSortReducer

SimpleTotalOrderPartitioner

TotalOrderPartitioner

前两个比较重要,特别是HFileOutputFormat中的public static void configureIncrementalLoad(Job job, HTable table) 方法和LoadIncrementablHFile中的 public void doBulkLoad(Path hfofDir, final HTable table)方法。

分别看下:

HFileOutputFormat.configureIncrementalLoad(Job job, HTable table) 

// 这个方法主要是在跑mapreduce之前对job再进行一些必要的设置,例如设置partitioner,设置reducer,设置reduce的个数等等。
public static void configureIncrementalLoad(Job job, HTable table) throws IOException {
Configuration conf = job.getConfiguration();
Class<? extends Partitioner> topClass;
try {
topClass = getTotalOrderPartitionerClass(); // 装载PartitionerClass,优先去装hadoop的TOP,其次的选择是跟hbase绑定的hadoopbackport下的TOP(其实是一个东东...)。
} catch (ClassNotFoundException e) {
throw new IOException("Failed getting TotalOrderPartitioner", e);
}
job.setPartitionerClass(topClass);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat.class);

// Based on the configured map output class, set the correct reducer to
// properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
// 根据Map阶段输出Value的类型选择相应的Reducer,这两种Reducer的区别下一篇说(区别不大)..
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}

// 获取table的所有region的startkey,并且将reduce数量设置为region的个数。
LOG.info("Looking up current regions for table " + table);
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
LOG.info("Configuring " + startKeys.size() + " reduce partitions "
+ "to match current region count");
job.setNumReduceTasks(startKeys.size());
// 下边的是写分区的信息,主要在writePartitions方法里
Path partitionsPath = new Path(job.getWorkingDirectory(), "partitions_" + UUID.randomUUID());// 分区信息的路径
LOG.info("Writing partition information to " + partitionsPath);

FileSystem fs = partitionsPath.getFileSystem(conf);
// 对regionStartKeys排序并删除第一个(因为只需要split points并且第一个是empty start
// key),然后写到一个SequenceFile中(供TOP读)。
writePartitions(conf, partitionsPath, startKeys);
partitionsPath.makeQualified(fs);

URI cacheUri;
try {
// Below we make explicit reference to the bundled TOP. Its
// cheating.
// We are assume the define in the hbase bundled TOP is as it is in
// hadoop (whether 0.20 or 0.22, etc.)
// 绑定分区文件和TOP的路径并写入DistributedCache,这里还是把两个TOP搞成一个了...那前边还判断个毛...
cacheUri = new URI(
partitionsPath.toString()
+ "#"
+ org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH);
} catch (URISyntaxException e) {
throw new IOException(e);
}
DistributedCache.addCacheFile(cacheUri, conf);
DistributedCache.createSymlink(conf);

// Set compression algorithms based on column families
// 把表的压缩和过滤信息写到conf
configureCompression(table, conf);
configureBloomType(table, conf);
// 动态加载依赖的jar
TableMapReduceUtil.addDependencyJars(job);
LOG.info("Incremental table output configured.");
}
LoadIncrementablHFile.doBulkLoad(Path hfofDir, final HTable table)

// doBulkLoad做的事情是把hfofDir下的hfile文件装载到table里..
public void doBulkLoad(Path hfofDir, final HTable table) throws TableNotFoundException,
IOException {
// 先判断表是否存在
final HConnection conn = table.getConnection();

if (!conn.isTableAvailable(table.getTableName())) {
throw new TableNotFoundException("Table " + Bytes.toStringBinary(table.getTableName())
+ "is not currently available.");
}

// 初始化线程池
// initialize thread pools
int nrThreads = cfg.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime()
.availableProcessors());
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("LoadIncrementalHFiles-%1$d");
ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), builder.build());
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);

// LQI queue does not need to be threadsafe -- all operations on this
// queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();// 一个LoadQueueItem代表一个要被加载的hfile,设计LoadQueueItem用来解决加载过程中发生region
// split的情况..
try {
// 去hfofDir下找hfile文件并加到queue中
discoverLoadQueue(queue, hfofDir);
int count = 0;

if (queue.isEmpty()) {
LOG.warn("Bulk load operation did not find any files to load in " + "directory "
+ hfofDir.toUri() + ". Does it contain files in "
+ "subdirectories that correspond to column family names?");
return;
}

// If using secure bulk load
// prepare staging directory and token
// 用于测试的东东,是否使用secure bulk load,是的话另外做一些处理。
if (useSecure) {
// This condition is here for unit testing
// Since delegation token doesn't work in mini cluster
if (User.isSecurityEnabled()) {
FileSystem fs = FileSystem.get(cfg);
userToken = fs.getDelegationToken("renewer");
}
bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getTableName());
}

// Assumes that region splits can happen while this occurs.
// 如果加载过程中region发生split,那么hfile可能会随着split会发生两部分,那么对应的两个新的LoadQueueItem会被加回到队列,所以循环结束的条件是队列空。
while (!queue.isEmpty()) {
// need to reload split keys each iteration.
// 实际上如果发生迭代,说明有split发生了..
final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
if (count != 0) {
LOG.info("Split occured while grouping HFiles, retry attempt " + +count
+ " with " + queue.size() + " files remaining to group or split");
}

int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 0);
if (maxRetries != 0 && count >= maxRetries) {
LOG.error("Retry attempted " + count + " times without completing, bailing out");
return;
}
count++;

// Using ByteBuffer for byte[] equality semantics
// 在这个方法里根据startEndKeys将queue中LoadQueueItem关联到目标region,其中还包含对split发生情况的处理,细节比较多,感兴趣的可以详细看下。需要说的一个点是:它根据里边包含的一个叫做groupOrSplit的方法的返回值是否为空来判定是group还是split,对于split,重新加入到queue...
// 返回值MultiMap中的ByteBuffer是region start key..
Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table, pool,
queue, startEndKeys);
// 真正do bulk load的地方..
// 将分组的LoadQueueItem集合装载到目标region,其中有个关键的方法tryAtomicRegionLoad,如果加载成功,它返回空list,失败,则返回需要重试的list,失败的继续被加到queue上去重试...
// 加载的动作各种调用,最终的实现其实是HRegion上的bulkLoadFiles方法...
bulkLoadPhase(table, conn, pool, queue, regionGroups);

// NOTE: The next iteration's split / group could happen in
// parallel to
// atomic bulkloads assuming that there are splits and no
// merges, and
// that we can atomically pull out the groups we want to retry.
}

} finally {
// 最终的一些结束处理方法,对于重试之后都没有加载成功的hfile,会打印出其路径。
if (useSecure) {
if (userToken != null) {
try {
userToken.cancel(cfg);
} catch (Exception e) {
LOG.warn("Failed to cancel HDFS delegation token.", e);
}
}
if (bulkToken != null) {
new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
}
}
pool.shutdown();
if (queue != null && !queue.isEmpty()) {
StringBuilder err = new StringBuilder();
err.append("-------------------------------------------------\n");
err.append("Bulk load aborted with some files not yet loaded:\n");
err.append("-------------------------------------------------\n");
for (LoadQueueItem q : queue) {
err.append(" ").append(q.hfilePath).append('\n');
}
LOG.error(err);
}
}

后边会再写一篇了解下另外几个类,另外还有篇bulk load实践的文章会放上来,内容包含使用hbase本身的importtsv工具和自己实现mapreduce去生产hfile并加载....
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  大数据 hbase bulk load