您的位置:首页 > 大数据 > Hadoop

HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

2016-06-03 14:36 826 查看
在《HDFS源码分析心跳汇报之数据结构初始化》一文中,我们了解到HDFS心跳相关的BlockPoolManager、BPOfferService、BPServiceActor三者之间的关系,并且知道最终HDFS的心跳是通过BPServiceActor线程实现的。那么,这个BPServiceActor线程到底是如何工作的呢?本文,我们将继续HDFS心跳分析之BPServiceActor工作线程运行流程。

首先,我们先看下

那么,BPServiceActor线程是通过什么样的流程来实现心跳的呢?我们来看下它正常工作的run()方法,代码如下:

[java] view plain copy







/**

* No matter what kind of exception we get, keep retrying to offerService().

* That's the loop that connects to the NameNode and provides basic DataNode

* functionality.

*

* Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can

* happen either at shutdown or due to refreshNamenodes.

*/

@Override

public void run() {

// 记录日志信息:starting to offer service

LOG.info(this + " starting to offer service");

try {

// 在一个while循环内,完成连接NameNode并握手操作,即初始化

while (true) {

// init stuff

try {

// setup storage

// 连接NameNode并握手

connectToNNAndHandshake();

break;

} catch (IOException ioe) {

// 如果存在异常

// Initial handshake, storage recovery or registration failed

// 现将运行状态runningState设置为初始化失败INIT_FAILED

runningState = RunningState.INIT_FAILED;

// 调用shouldRetryInit()方法判断初始化失败时是否可以重试

if (shouldRetryInit()) {

// Retry until all namenode's of BPOS failed initialization

// 记录error日志信息

LOG.error("Initialization failed for " + this + " "

+ ioe.getLocalizedMessage());

// 线程休眠5s,并记录info日志信息,之后再进入循环重复执行之前的操作

sleepAndLogInterrupts(5000, "initializing");

} else {

// 不允许重试的情况下,将运行状态runningState设置为失败FAILED,退出循环,并返回

runningState = RunningState.FAILED;

LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);

return;

}

}

}

// 设置运行状态runningState为正在运行RUNNING

runningState = RunningState.RUNNING;

// 进入另一个while循环,不停的调用offerService()方法,

// 发送心跳给NameNode并接收来自NameNode,然后根据命令交给不同的组件去处理

// 循环的条件就是该线程的标志位shouldServiceRun为true,且dataNode的shouldRun()返回true

while (shouldRun()) {

try {

offerService();

} catch (Exception ex) {

// 存在异常的话,记录error日志,并休眠5s

LOG.error("Exception in BPOfferService for " + this, ex);

sleepAndLogInterrupts(5000, "offering service");

}

}

// 设置运行状态runningState为已退出EXITED

runningState = RunningState.EXITED;

} catch (Throwable ex) {

LOG.warn("Unexpected exception in block pool " + this, ex);

runningState = RunningState.FAILED;

} finally {

LOG.warn("Ending block pool service for: " + this);

// 清空,释放占用的资源

cleanUp();

}

}

在run()方法的开始,也就是BPServiceActor线程刚启动时,会在一个while循环内,完成连接NameNode并握手操作,即初始化。这里,是通过调用connectToNNAndHandshake()方法完成与NameNode的连接并进行两次握手的。值得一提的是,如果出现了IOException异常,会先将运行状态runningState设置为初始化失败INIT_FAILED,然后调用shouldRetryInit()方法判断初始化失败时是否可以重试:

1、如果可以重试的话,记录error日志信息,线程休眠5s,并记录info日志信息,之后再进入循环重复执行之前的操作;

2、不允许重试的情况下,将运行状态runningState设置为失败FAILED,退出循环,并返回。

接下来,设置运行状态runningState为正在运行RUNNING,进入另一个while循环,不停的调用offerService()方法,发送心跳给NameNode并接收来自NameNode的命令,然后根据命令交给不同的组件去处理,循环的条件就是该线程的标志位shouldServiceRun为true,且dataNode的shouldRun()返回true;

而当循环过程中存在异常Exception的话,记录error日志,并休眠5s,然后继续循环,只有当shouldRun()方法返回false,才退出循环,设置运行状态runningState为已退出EXITED,最终调用cleanUp()方法释放占用的资源等。

上述就是BPServiceActor线程正常工作进行周期性心跳的主流程。下面,我们针对其中的某些细节进行详细描述。

首先,完成与NameNode的连接并进行两次握手的connectToNNAndHandshake()方法,实现如下:

[java] view plain copy







/**

* 连接NameNode并握手

*/

private void connectToNNAndHandshake() throws IOException {

// get NN proxy

// 利用DataNode实例dn的connectToNN()方法和NameNode地址nnAddr获得NameNode的代理bpNamenode

bpNamenode = dn.connectToNN(nnAddr);

// First phase of the handshake with NN - get the namespace

// info.

// 与NameNode握手第一阶段:获取命名空间信息

NamespaceInfo nsInfo = retrieveNamespaceInfo();

// Verify that this matches the other NN in this HA pair.

// This also initializes our block pool in the DN if we are

// the first NN connection for this BP.

// 验证,并设置命名空间信息()

bpos.verifyAndSetNamespaceInfo(nsInfo);

// Second phase of the handshake with the NN.

// 与NameNode握手第二阶段,注册

register();

}

它的主要处理流程如下:

1、利用DataNode实例dn的connectToNN()方法和NameNode地址nnAddr获得NameNode的代理bpNamenode
2、与NameNode握手第一阶段:调用retrieveNamespaceInfo()方法获取命名空间信息nsInfo;

3、通过bpos的verifyAndSetNamespaceInfo()方法进行验证,并设置命名空间信息nsInfo;

4、与NameNode握手第二阶段,调用register()方法进行注册。

接着,我们再看下实现周期性心跳的offerService()方法,代码如下:
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: