您的位置:首页 > 大数据 > Hadoop

HDFS源码分析数据块复制之PendingReplicationBlocks

2016-06-03 14:44 519 查看
PendingReplicationBlocks实现了所有正在复制的数据块的记账工作。它实现以下三个主要功能:

1、记录此时正在复制的块;

2、一种对复制请求进行跟踪的粗粒度计时器;

3、一个定期识别未执行复制请求的线程。

我们先看下它内部有哪些成员变量,如下:

[java] view plain copy

// 块和正在进行的块复制信息的映射集合

private final Map<Block, PendingBlockInfo> pendingReplications;

// 复制请求超时的块列表

private final ArrayList<Block> timedOutItems;

// 后台工作线程

Daemon timerThread = null;

// 文件系统是否正在运行的标志位

private volatile boolean fsRunning = true;

//

// It might take anywhere between 5 to 10 minutes before

// a request is timed out.

// 在一个请求超时之前可能需要5到10分钟

// 请求超时阈值,默认为5分钟

private long timeout = 5 * 60 * 1000;

// 超时检查固定值:5分钟

private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000;

首先是pendingReplications,它是块和正在进行的块复制信息的映射集合,所有正在复制的数据块及其对应复制信息都会被加入到这个集合。数据块复制信息PendingBlockInfo是对数据块开始复制时间timeStamp、待复制的目标数据节点列表List<DatanodeDescriptor>实例targets的一个封装,代码如下:

[java] view plain copy

/**

* An object that contains information about a block that

* is being replicated. It records the timestamp when the

* system started replicating the most recent copy of this

* block. It also records the list of Datanodes where the

* replication requests are in progress.

*

* 正在被复制的块信息。它记录系统开始复制块最新副本的时间,也记录复制请求正在执行的数据节点列表。

*/

static class PendingBlockInfo {

// 时间戳

private long timeStamp;

// 待复制的目标数据节点列表

private final List<DatanodeDescriptor> targets;

// 构造方法

PendingBlockInfo(DatanodeDescriptor[] targets) {

// 时间戳赋值为当前时间

this.timeStamp = now();

this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()

: new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));

}

long getTimeStamp() {

return timeStamp;

}

// 设置时间戳为当前时间

void setTimeStamp() {

timeStamp = now();

}

// 增加复制数量,即增加目标数据节点

void incrementReplicas(DatanodeDescriptor... newTargets) {

if (newTargets != null) {

for (DatanodeDescriptor dn : newTargets) {

targets.add(dn);

}

}

}

// 减少复制数量,即减少目标数据节点

void decrementReplicas(DatanodeDescriptor dn) {

targets.remove(dn);

}

// 获取复制数量,即或许待复制的数据节点数目

int getNumReplicas() {

return targets.size();

}

}

它的构造方法中,即将时间戳timeStamp赋值为当前时间,并且提供了设置时间戳为当前时间的setTimeStamp()方法。同时提供了增加复制数量、减少复制数量、获取复制数量相关的三个方法,均是对待复制的目标数据节点列表的增加、减少与计数操作,上面注释很清楚,不再详述!

另外两个比较重要的变量就是复制请求超时的块列表timedOutItems和后台工作线程timerThread。由后台工作线程周期性的检查pendingReplications列表中的待复制数据块,看看其是否超时,如果超时的话,将其加入timedOutItems列表。后台工作线程timerThread的初始化如下:

[java] view plain copy

// 启动块复制监控线程

void start() {

timerThread = new Daemon(new PendingReplicationMonitor());

timerThread.start();

}

它实际上是借助PendingReplicationMonitor来完成的。PendingReplicationMonitor实现了Runnable接口,是一个周期性工作的线程,用于浏览从未完成它们复制请求的数据块,这个从未完成实际上就是在规定时间内还未完成的数据块复制信息。PendingReplicationMonitor的实现如下:

[java] view plain copy

/*

* A periodic thread that scans for blocks that never finished

* their replication request.

* 一个周期性线程,用于浏览从未完成它们复制请求的数据块

*/

class PendingReplicationMonitor implements Runnable {

@Override

public void run() {

// 如果标志位fsRunning为true,即文件系统正常运行,则while循环一直进行

while (fsRunning) {

// 检查周期:取timeout,最高为5分钟

long period = Math.min(DEFAULT_RECHECK_INTERVAL, timeout);

try {

// 检查方法

pendingReplicationCheck();

// 线程休眠period

Thread.sleep(period);

} catch (InterruptedException ie) {

if(LOG.isDebugEnabled()) {

LOG.debug("PendingReplicationMonitor thread is interrupted.", ie);

}

}

}

}

/**

* Iterate through all items and detect timed-out items

* 通过所有项目迭代检测超时项目

*/

void pendingReplicationCheck() {

// 使用synchronized关键字对pendingReplications进行同步

synchronized (pendingReplications) {

// 获取集合pendingReplications的迭代器

Iterator<Map.Entry<Block, PendingBlockInfo>> iter =

pendingReplications.entrySet().iterator();

// 记录当前时间now

long now = now();

if(LOG.isDebugEnabled()) {

LOG.debug("PendingReplicationMonitor checking Q");

}

// 遍历pendingReplications集合中的每个元素

while (iter.hasNext()) {

// 取出每个<Block, PendingBlockInfo>条目

Map.Entry<Block, PendingBlockInfo> entry = iter.next();

// 取出Block对应的PendingBlockInfo实例pendingBlock

PendingBlockInfo pendingBlock = entry.getValue();

// 判断pendingBlock自其生成时的timeStamp以来到现在,是否已超过timeout时间

if (now > pendingBlock.getTimeStamp() + timeout) {

// 超过的话,

// 取出timeout实例block

Block block = entry.getKey();

// 使用synchronized关键字对timedOutItems进行同步

synchronized (timedOutItems) {

// 将block添加入复制请求超时的块列表timedOutItems

timedOutItems.add(block);

}

LOG.warn("PendingReplicationMonitor timed out " + block);

// 从迭代器中移除该条目

iter.remove();

}

}

}

}

}

在它的run()方法内,如果标志位fsRunning为true,即文件系统正常运行,则while循环一直进行,然后在while循环内:

1、先取检查周期period:取timeout,最高为5分钟;

2、调用pendingReplicationCheck()方法进行检查;

3、线程休眠period时间,再次进入while循环。

pendingReplicationCheck的实现逻辑也很简单,如下:

使用synchronized关键字对pendingReplications进行同步:

1、获取集合pendingReplications的迭代器iter;

2、记录当前时间now;

3、遍历pendingReplications集合中的每个元素:

3.1、取出每个<Block, PendingBlockInfo>条目;

3.2、取出Block对应的PendingBlockInfo实例pendingBlock;

3.3、判断pendingBlock自其生成时的timeStamp以来到现在,是否已超过timeout时间,超过的话:

3.3.1、取出timeout实例block;

3.3.2、使用synchronized关键字对timedOutItems进行同步,使用synchronized关键字对timedOutItems进行同步;

3.3.3、从迭代器中移除该条目。

PendingReplicationBlocks还提供了获取复制超时块数组的getTimedOutBlocks()方法,代码如下:

[java] view plain copy

/**

* Returns a list of blocks that have timed out their

* replication requests. Returns null if no blocks have

* timed out.

* 返回一个其复制请求已超时的数据块列表,如果没有则返回null

*/

Block[] getTimedOutBlocks() {

/ 使用synchronized关键字对timedOutItems进行同步

synchronized (timedOutItems) {

// 如果timedOutItems中没有数据,则直接返回null

if (timedOutItems.size() <= 0) {

return null;

}

// 将Block列表timedOutItems转换成Block数组

Block[] blockList = timedOutItems.toArray(

new Block[timedOutItems.size()]);

// 清空Block列表timedOutItems

timedOutItems.clear();

// 返回Block数组

return blockList;

}

}

PendingReplicationBlocks另外还提供了增加一个块到正在进行的块复制信息列表中的increment()方法和减少正在复制请求的数量的decrement()方法,代码如下:

[java] view plain copy

/**

* Add a block to the list of pending Replications

* 增加一个块到正在进行的块复制信息列表中

*

* @param block The corresponding block

* @param targets The DataNodes where replicas of the block should be placed

*/

void increment(Block block, DatanodeDescriptor[] targets) {

// 使用synchronized关键字对pendingReplications进行同步

synchronized (pendingReplications) {

// 根据Block实例block先从集合pendingReplications中查找

PendingBlockInfo found = pendingReplications.get(block);

if (found == null) {

// 如果没有找到,直接put进去,利用DatanodeDescriptor[]的实例targets构造PendingBlockInfo对象

pendingReplications.put(block, new PendingBlockInfo(targets));

} else {

// 如果之前存在,增加复制数量,即增加目标数据节点

found.incrementReplicas(targets);

// 设置时间戳为当前时间

found.setTimeStamp();

}

}

}

/**

* One replication request for this block has finished.

* Decrement the number of pending replication requests

* for this block.

* 针对给定数据块的一个复制请求已完成。针对该数据块,减少正在复制请求的数量。

*

* @param The DataNode that finishes the replication

*/

void decrement(Block block, DatanodeDescriptor dn) {

// 使用synchronized关键字对pendingReplications进行同步

synchronized (pendingReplications) {

// 根据Block实例block先从集合pendingReplications中查找

PendingBlockInfo found = pendingReplications.get(block);

if (found != null) {

if(LOG.isDebugEnabled()) {

LOG.debug("Removing pending replication for " + block);

}

// 减少复制数量,即减少目标数据节点

found.decrementReplicas(dn);

// 如果数据块对应的复制数量总数小于等于0,复制工作完成,

// 直接从pendingReplications集合中移除该数据块及其对应信息

if (found.getNumReplicas() <= 0) {

pendingReplications.remove(block);

}

}

}

}

以及统计块数量和块复制数量的方法,如下:

[java] view plain copy

/**

* The total number of blocks that are undergoing replication

* 正在被复制的块的总数

*/

int size() {

return pendingReplications.size();

}

/**

* How many copies of this block is pending replication?

* 块复制的总量

*/

int getNumReplicas(Block block) {

synchronized (pendingReplications) {

PendingBlockInfo found = pendingReplications.get(block);

if (found != null) {

return found.getNumReplicas();

}

}

return 0;

}

上述方法代码逻辑都很简单,而且注释也很详细,此处不再过多赘述!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: