您的位置:首页 > 大数据 > 人工智能

HDFS源码分析之EditLogTailer

2016-04-11 22:41 543 查看
        在FSNamesystem中,有这么一个成员变量,定义如下:

/**
* Used when this NN is in standby state to read from the shared edit log.
* 当NameNode处于standby状态时用于从共享的edit log读取数据
*/
private EditLogTailer editLogTailer = null;        editLogTailer是一个编辑日志edit log的追踪器,它的主要作用就是当NameNode处于standby状态时用于从共享的edit log读取数据。它的构造是在FSNamesystem的startStandbyServices()方法中,代码如下:
editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start();        利用当前FSNamesystem实例this和配置信息conf实例化一个EditLogTailer对象,然后调用其start()方法启动它。
        接下来我们看看EditLogTailer的实现,先来看下其成员变量,代码如下:

// 编辑日志跟踪线程EditLogTailerThread实例tailerThread
private final EditLogTailerThread tailerThread;

// HDFS配置信息Configuration实例conf
private final Configuration conf;

// 文件系统命名空间FSNamesystem实例namesystem
private final FSNamesystem namesystem;

// 文件系统编辑日志FSEditLog实例editLog
private FSEditLog editLog;

// Active NameNode地址InetSocketAddress
private InetSocketAddress activeAddr;

// 名字节点通信接口NamenodeProtocol
private NamenodeProtocol cachedActiveProxy = null;

/**
* The last transaction ID at which an edit log roll was initiated.
* 一次编辑日志滚动开始时的最新事务ID
*/
private long lastRollTriggerTxId = HdfsConstants.INVALID_TXID;

/**
* The highest transaction ID loaded by the Standby.
* StandBy NameNode加载的最高事务ID
*/
private long lastLoadedTxnId = HdfsConstants.INVALID_TXID;

/**
* The last time we successfully loaded a non-zero number of edits from the
* shared directory.
* 最后一次我们从共享目录成功加载一个非零编辑的时间
*/
private long lastLoadTimestamp;

/**
* How often the Standby should roll edit logs. Since the Standby only reads
* from finalized log segments, the Standby will only be as up-to-date as how
* often the logs are rolled.
* StandBy NameNode滚动编辑日志的时间间隔。
*/
private final long logRollPeriodMs;

/**
* How often the Standby should check if there are new finalized segment(s)
* available to be read from.
* StandBy NameNode检查是否存在可以读取的新的最终日志段的时间间隔
*/
private final long sleepTimeMs;        其中,比较重要的几个变量如下:
        1、EditLogTailerThread tailerThread:它是编辑日志跟踪线程,

        我们再来看下EditLogTailer的构造方法,如下:

public EditLogTailer(FSNamesystem namesystem, Configuration conf) {

// 实例化编辑日志追踪线程EditLogTailerThread
this.tailerThread = new EditLogTailerThread();

// 根据入参初始化配置信息conf和文件系统命名系统namesystem
this.conf = conf;
this.namesystem = namesystem;

// 从namesystem中获取editLog
this.editLog = namesystem.getEditLog();

// 最新加载edit log时间lastLoadTimestamp初始化为当前时间
lastLoadTimestamp = now();

// StandBy NameNode滚动编辑日志的时间间隔logRollPeriodMs
// 取参数dfs.ha.log-roll.period,参数未配置默认为2min
logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000;

// 如果logRollPeriodMs大于等于0
if (logRollPeriodMs >= 0) {

// 调用getActiveNodeAddress()方法初始化Active NameNode地址activeAddr
this.activeAddr = getActiveNodeAddress();
Preconditions.checkArgument(activeAddr.getPort() > 0,
"Active NameNode must have an IPC port configured. " +
"Got address '%s'", activeAddr);
LOG.info("Will roll logs on active node at " + activeAddr + " every " +
(logRollPeriodMs / 1000) + " seconds.");
} else {
LOG.info("Not going to trigger log rolls on active node because " +
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY + " is negative.");
}

// StandBy NameNode检查是否存在可以读取的新的最终日志段的时间间隔sleepTimeMs
// 取参数dfs.ha.tail-edits.period,参数未配置默认为1min
sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000;

LOG.debug("logRollPeriodMs=" + logRollPeriodMs +
" sleepTime=" + sleepTimeMs);
}        下面,我们再看下这个十分重要的编辑日志追踪线程EditLogTailerThread的实现,它的构造方法很简单,没有什么可说的,我们着重看下它的run()方法,代码如下:
@Override
public void run() {
SecurityUtil.doAsLoginUserOrFatal(
new PrivilegedAction<Object>() {
@Override
public Object run() {
doWork();
return null;
}
});
}        run()方法内继而调用了doWork()方法,代码如下:
private void doWork() {

// 标志位shouldRun为true时一直循环
while (shouldRun) {
try {
// There's no point in triggering a log roll if the Standby hasn't
// read any more transactions since the last time a roll was
// triggered.
// 自从上次日志滚动触发以来,如果StandBy NameNode没有读到任何事务的话,没有点触发一次日志滚动,

// 如果是自从上次加载后过了太长时间,并且上次编辑日志滚动开始时的最新事务ID小于上次StandBy NameNode加载的最高事务ID
if (tooLongSinceLastLoad() &&
lastRollTriggerTxId < lastLoadedTxnId) {

// 触发Active NameNode进行编辑日志滚动
triggerActiveLogRoll();
}
/**
* Check again in case someone calls {@link EditLogTailer#stop} while
* we're triggering an edit log roll, since ipc.Client catches and
* ignores {@link InterruptedException} in a few places. This fixes
* the bug described in HDFS-2823.
*/

// 判断标志位shouldRun,如果其为false的话,退出循环
if (!shouldRun) {
break;
}

// 调用doTailEdits()方法执行日志追踪
doTailEdits();
} catch (EditLogInputException elie) {
LOG.warn("Error while reading edits from disk. Will try again.", elie);
} catch (InterruptedException ie) {
// interrupter should have already set shouldRun to false
continue;
} catch (Throwable t) {
LOG.fatal("Unknown error encountered while tailing edits. " +
"Shutting down standby NN.", t);
terminate(1, t);
}

// 线程休眠sleepTimeMs时间后继续工作
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
LOG.warn("Edit log tailer interrupted", e);
}
}
}        当标志位shouldRun为true时,doWork()方法一直在while循环内执行,其处理逻辑如下:
        1、如果是自从上次加载后过了太长时间,并且上次编辑日志滚动开始时的最新事务ID小于上次StandBy NameNode加载的最高事务ID,触发Active NameNode进行编辑日志滚动:

        自从上次加载后过了太长时间是根据tooLongSinceLastLoad()方法判断的,而触发Active NameNode进行编辑日志滚动则是通过triggerActiveLogRoll()方法来完成的;

        2、判断标志位shouldRun,如果其为false的话,退出循环;

        3、调用doTailEdits()方法执行日志追踪;

        4、线程休眠sleepTimeMs时间后继续执行上述工作。

        我们先来看下如果确定自从上次加载后过了太长时间,tooLongSinceLastLoad()方法代码如下:

/**
* @return true if the configured log roll period has elapsed.
*/
private boolean tooLongSinceLastLoad() {

// StandBy NameNode滚动编辑日志的时间间隔logRollPeriodMs大于0,
// 且最后一次我们从共享目录成功加载一个非零编辑的时间到现在的时间间隔大于logRollPeriodMs
return logRollPeriodMs >= 0 &&
(now() - lastLoadTimestamp) > logRollPeriodMs ;
}        它判断的主要依据就是,StandBy NameNode滚动编辑日志的时间间隔logRollPeriodMs大于0,且最后一次我们从共享目录成功加载一个非零编辑的时间到现在的时间间隔大于logRollPeriodMs。
        触发Active NameNode进行编辑日志滚动的triggerActiveLogRoll()方法代码如下:

/**
* Trigger the active node to roll its logs.
* 触发Active NameNode滚动日志
*/
private void triggerActiveLogRoll() {
LOG.info("Triggering log roll on remote NameNode " + activeAddr);
try {

// 获得Active NameNode的代理,并调用其rollEditLog()方法滚动编辑日志
getActiveNodeProxy().rollEditLog();

// 将上次StandBy NameNode加载的最高事务ID,即lastLoadedTxnId,赋值给上次编辑日志滚动开始时的最新事务ID,即lastRollTriggerTxId,
// 这么做是为了方便进行日志回滚
lastRollTriggerTxId = lastLoadedTxnId;
} catch (IOException ioe) {
LOG.warn("Unable to trigger a roll of the active NN", ioe);
}
}        它首先会获得Active NameNode的代理,并调用其rollEditLog()方法滚动编辑日志,然后将上次StandBy NameNode加载的最高事务ID,即lastLoadedTxnId,赋值给上次编辑日志滚动开始时的最新事务ID,即lastRollTriggerTxId,这么做是为了方便进行日志回滚以及逻辑判断。
        好了,最后我们看下最重要的执行日志追踪的doTailEdits()方法吧,代码如下:

@VisibleForTesting
void doTailEdits() throws IOException, InterruptedException {
// Write lock needs to be interruptible here because the
// transitionToActive RPC takes the write lock before calling
// tailer.stop() -- so if we're not interruptible, it will
// deadlock.

// namesystem加写锁
namesystem.writeLockInterruptibly();
try {

// 通过namesystem获取文件系统镜像FSImage实例image
FSImage image = namesystem.getFSImage();

// 通过文件系统镜像FSImage实例image获取最新的事务ID
long lastTxnId = image.getLastAppliedTxId();

if (LOG.isDebugEnabled()) {
LOG.debug("lastTxnId: " + lastTxnId);
}
Collection<EditLogInputStream> streams;
try {

// 从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据
streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);
} catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits
// log roll, i.e. the last one has been finalized but the new inprogress
// edits file hasn't been started yet.
LOG.warn("Edits tailer failed to find any streams. Will try again " +
"later.", ioe);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("edit streams to load from: " + streams.size());
}

// Once we have streams to load, errors encountered are legitimate cause
// for concern, so we don't catch them here. Simple errors reading from
// disk are ignored.
long editsLoaded = 0;
try {

// 调用文件系统镜像FSImage实例image的loadEdits(),
// 利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage,
// 并获得编辑日志加载的大小editsLoaded
editsLoaded = image.loadEdits(streams, namesystem);
} catch (EditLogInputException elie) {
editsLoaded = elie.getNumEditsLoaded();
throw elie;
} finally {
if (editsLoaded > 0 || LOG.isDebugEnabled()) {
LOG.info(String.format("Loaded %d edits starting from txid %d ",
editsLoaded, lastTxnId));
}
}

if (editsLoaded > 0) {// 如果editsLoaded大于0
// 最后一次我们从共享目录成功加载一个非零编辑的时间lastLoadTimestamp更新为当前时间
lastLoadTimestamp = now();
}

// 上次StandBy NameNode加载的最高事务ID更新为image中最新事务ID
lastLoadedTxnId = image.getLastAppliedTxId();
} finally {

// namesystem去除写锁
namesystem.writeUnlock();
}
}        大体处理流程如下:
        1、首先,namesystem加写锁;

        2、通过namesystem获取文件系统镜像FSImage实例image;

        3、通过文件系统镜像FSImage实例image获取最新的事务ID,即lastTxnId;

        4、从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据:

        ps:注意,这个编辑日志输入流集合streams并非读取的是editLog对象中的数据,毕竟editLog也是根据namesystem来获取的,如果从其中读取数据再加载到namesystem中的fsimage中,没有多大意义,这个日志输入流实际上是通过Hadoop HA中的JournalNode来获取的,这个我们以后再分析。

        5、调用文件系统镜像FSImage实例image的loadEdits(),利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage,并获得编辑日志加载的大小editsLoaded;

        6、如果editsLoaded大于0,最后一次我们从共享目录成功加载一个非零编辑的时间lastLoadTimestamp更新为当前时间;

        7、上次StandBy NameNode加载的最高事务ID更新为image中最新事务ID;

        8、namesystem去除写锁。

        部分涉及FSImage、FSEditLog、JournalNode等的细节,限于篇幅,我们以后再分析!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: