您的位置:首页 > 大数据 > Hadoop

HDFS源码分析EditLog之读取操作符

2016-06-03 14:41 453 查看
《HDFS源码分析EditLog之获取编辑日志输入流》一文中,我们详细了解了如何获取编辑日志输入流EditLogInputStream。在我们得到编辑日志输入流后,是不是就该从输入流中获取数据来处理呢?答案是显而易见的!在《HDFS源码分析之EditLogTailer》一文中,我们在讲编辑日志追踪同步时,也讲到了如下两个连续的处理流程:

4、从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据
5、调用文件系统镜像FSImage实例image的loadEdits(),利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage,并获得编辑日志加载的大小editsLoaded;

可见,我们在获得编辑日志输入流EditLogInputStream的集合streams后,就需要调用FSImage的loadEdits()方法,利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage。而HDFS是如何从编辑日志输入流中读取数据的呢?本文,我们将进行详细的探究!

首先,在加载编辑日志的主要类FSEditLogLoader中,其核心方法loadEditRecords()中有如下一段代码:

[java] view plain copy







while (true) {

try {

FSEditLogOp op;

try {

// 从编辑日志输入流in中读取操作符op

op = in.readOp();

// 如果操作符op为空,直接跳出循环,并返回

if (op == null) {

break;

}

} catch (Throwable e) {

// ...省略部分代码

}

// ...省略部分代码

try {

// ...省略部分代码

long inodeId = applyEditLogOp(op, fsDir, startOpt,

in.getVersion(true), lastInodeId);

if (lastInodeId < inodeId) {

lastInodeId = inodeId;

}

} catch (RollingUpgradeOp.RollbackException e) {

// ...省略部分代码

} catch (Throwable e) {

// ...省略部分代码

}

// ...省略部分代码

} catch (RollingUpgradeOp.RollbackException e) {

// ...省略部分代码

} catch (MetaRecoveryContext.RequestStopException e) {

// ...省略部分代码

}

}

它会从编辑日志输入流in中读取一个操作符op,然后调用applyEditLogOp()方法,将操作符作用于内存元数据FSNamesystem。那么问题来了,这个操作符如何从数据流中被读取并解析的呢?

接下来,我们就看下如何从编辑日志输出流EditLogInputStream中读取一个操作符,我们先看其readOp()方法,代码如下:

[java] view plain copy







/**

* Read an operation from the stream

* @return an operation from the stream or null if at end of stream

* @throws IOException if there is an error reading from the stream

*/

public FSEditLogOp readOp() throws IOException {

FSEditLogOp ret;

// 如果缓存的cachedOp不为null,返回缓存的cachedOp,并将其清空

if (cachedOp != null) {

ret = cachedOp;

cachedOp = null;

return ret;

}

// 如果缓存的cachedOp为null,调用nextOp()进行处理

return nextOp();

}

很简单,如果缓存的cachedOp不为null,返回缓存的cachedOp,并将其清空,如果缓存的cachedOp为null,则调用nextOp()进行处理。而EditLogInputStream中nextOp()是一个抽象方法,我们需要看其子类的实现方法,下面就以EditLogFileInputStream为例,看下其nextOp()方法:

[java] view plain copy







@Override

protected FSEditLogOp nextOp() throws IOException {

return nextOpImpl(false);

}

继续追踪nextOpImpl()方法,代码如下:

[java] view plain copy







private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {

FSEditLogOp op = null;

// 根据编辑日志文件输入流的状态判断:

switch (state) {

case UNINIT:// 如果为未初始化状态UNINIT

try {

// 调用init()方法进行初始化

init(true);

} catch (Throwable e) {

LOG.error("caught exception initializing " + this, e);

if (skipBrokenEdits) {

return null;

}

Throwables.propagateIfPossible(e, IOException.class);

}

// 检测编辑日志文件输入流状态,此时不应为UNINIT

Preconditions.checkState(state != State.UNINIT);

// 再次调用nextOpImpl()方法

return nextOpImpl(skipBrokenEdits);

case OPEN:// 如果为打开OPEN状态

// 调用FSEditLogOp.Reader的readOp()方法,读取操作符

op = reader.readOp(skipBrokenEdits);

if ((op != null) && (op.hasTransactionId())) {

long txId = op.getTransactionId();

if ((txId >= lastTxId) &&

(lastTxId != HdfsConstants.INVALID_TXID)) {

//

// Sometimes, the NameNode crashes while it's writing to the

// edit log. In that case, you can end up with an unfinalized edit log

// which has some garbage at the end.

// JournalManager#recoverUnfinalizedSegments will finalize these

// unfinished edit logs, giving them a defined final transaction

// ID. Then they will be renamed, so that any subsequent

// readers will have this information.

//

// Since there may be garbage at the end of these "cleaned up"

// logs, we want to be sure to skip it here if we've read everything

// we were supposed to read out of the stream.

// So we force an EOF on all subsequent reads.

//

long skipAmt = log.length() - tracker.getPos();

if (skipAmt > 0) {

if (LOG.isDebugEnabled()) {

LOG.debug("skipping " + skipAmt + " bytes at the end " +

"of edit log '" + getName() + "': reached txid " + txId +

" out of " + lastTxId);

}

tracker.clearLimit();

IOUtils.skipFully(tracker, skipAmt);

}

}

}

break;

case CLOSED: // 如果为关闭CLOSED状态,直接返回null

break; // return null

}

return op;

}

nextOpImpl()方法的大体处理逻辑如下:

根据编辑日志文件输入流的状态判断:

1、如果为未初始化状态UNINIT,调用init()方法进行初始化,然后检测编辑日志文件输入流状态,此时不应为UNINIT,最后再次调用nextOpImpl()方法;

2、如果为打开OPEN状态,调用FSEditLogOp.Reader的readOp()方法,读取操作符op;

3、如果为关闭CLOSED状态,直接返回null。

我们重点关注下FSEditLogOp.Reader的readOp()方法,代码如下:

[java] view plain copy







/**

* Read an operation from the input stream.

*

* Note that the objects returned from this method may be re-used by future

* calls to the same method.

*

* @param skipBrokenEdits If true, attempt to skip over damaged parts of

* the input stream, rather than throwing an IOException

* @return the operation read from the stream, or null at the end of the

* file

* @throws IOException on error. This function should only throw an

* exception when skipBrokenEdits is false.

*/

public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {

while (true) {

try {

// 调用decodeOp()方法

return decodeOp();

} catch (IOException e) {

in.reset();

if (!skipBrokenEdits) {

throw e;

}

} catch (RuntimeException e) {

// FSEditLogOp#decodeOp is not supposed to throw RuntimeException.

// However, we handle it here for recovery mode, just to be more

// robust.

in.reset();

if (!skipBrokenEdits) {

throw e;

}

} catch (Throwable e) {

in.reset();

if (!skipBrokenEdits) {

throw new IOException("got unexpected exception " +

e.getMessage(), e);

}

}

// Move ahead one byte and re-try the decode process.

if (in.skip(1) < 1) {

return null;

}

}

}

继续追踪decodeOp()方法,代码如下:

[java] view plain copy







/**

* Read an opcode from the input stream.

* 从输入流中读取一个操作符code

*

* @return the opcode, or null on EOF.

*

* If an exception is thrown, the stream's mark will be set to the first

* problematic byte. This usually means the beginning of the opcode.

*/

private FSEditLogOp decodeOp() throws IOException {

limiter.setLimit(maxOpSize);

in.mark(maxOpSize);

if (checksum != null) {

checksum.reset();

}

byte opCodeByte;

try {

// 从输入流in中读取一个byte,即opCodeByte

opCodeByte = in.readByte();

} catch (EOFException eof) {

// EOF at an opcode boundary is expected.

return null;

}

// 将byte类型的opCodeByte转换成FSEditLogOpCodes对象opCode

FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);

if (opCode == OP_INVALID) {

verifyTerminator();

return null;

}

// 根据FSEditLogOpCodes对象opCode从cache中获取FSEditLogOp对象op

FSEditLogOp op = cache.get(opCode);

if (op == null) {

throw new IOException("Read invalid opcode " + opCode);

}

// 如果支持编辑日志长度,从输入流读入一个int,

if (supportEditLogLength) {

in.readInt();

}

if (NameNodeLayoutVersion.supports(

LayoutVersion.Feature.STORED_TXIDS, logVersion)) {

// Read the txid

// 如果支持事务ID,读入一个long,作为事务ID,并在FSEditLogOp实例op中设置事务ID

op.setTransactionId(in.readLong());

} else {

// 如果不支持事务ID,在FSEditLogOp实例op中设置事务ID为-12345

op.setTransactionId(HdfsConstants.INVALID_TXID);

}

// 从输入流in中读入其他域,并设置入FSEditLogOp实例op

op.readFields(in, logVersion);

validateChecksum(in, checksum, op.txid);

return op;

}

decodeOp()方法的逻辑很简单:

1、从输入流in中读取一个byte,即opCodeByte,确定操作类型;

2、将byte类型的opCodeByte转换成FSEditLogOpCodes对象opCode;

3、根据FSEditLogOpCodes对象opCode从cache中获取FSEditLogOp对象op,这样我们就得到了操作符对象;

4、如果支持编辑日志长度,从输入流读入一个int;

5、如果支持事务ID,读入一个long,作为事务ID,并在FSEditLogOp实例op中设置事务ID,否则在FSEditLogOp实例op中设置事务ID为-12345;

6、调用操作符对象op的readFields()方法,从输入流in中读入其他域,并设置入FSEditLogOp实例op。

接下来,我们再看下操作符对象的readFields()方法,因为不同种类的操作符肯定包含不同的属性,所以它们的readFields()方法肯定也各不相同。下面,我们就以操作符AddCloseOp为例来分析,其readFields()方法如下:

[java] view plain copy







@Override

void readFields(DataInputStream in, int logVersion)

throws IOException {

// 读取长度:如果支持读入长度,从输入流in读取一个int,赋值给length

if (!NameNodeLayoutVersion.supports(

LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {

this.length = in.readInt();

}

// 读取节点ID:如果支持读入节点ID,从输入流in读取一个long,赋值给inodeId,否则inodeId默认为0

if (NameNodeLayoutVersion.supports(

LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {

this.inodeId = in.readLong();

} else {

// The inodeId should be updated when this editLogOp is applied

this.inodeId = INodeId.GRANDFATHER_INODE_ID;

}

// 版本兼容性校验

if ((-17 < logVersion && length != 4) ||

(logVersion <= -17 && length != 5 && !NameNodeLayoutVersion.supports(

LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion))) {

throw new IOException("Incorrect data format." +

" logVersion is " + logVersion +

" but writables.length is " +

length + ". ");

}

// 读取路径:从输入流in读取一个String,赋值给path

this.path = FSImageSerialization.readString(in);

// 读取副本数、修改时间:如果支持读取副本数、修改时间,分别从输入流读取一个short、long,

// 赋值给replication、mtime

if (NameNodeLayoutVersion.supports(

LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {

this.replication = FSImageSerialization.readShort(in);

this.mtime = FSImageSerialization.readLong(in);

} else {

this.replication = readShort(in);

this.mtime = readLong(in);

}

// 读取访问时间:如果支持读取访问时间,从输入流读取一个long,赋值给atime,否则atime默认为0

if (NameNodeLayoutVersion.supports(

LayoutVersion.Feature.FILE_ACCESS_TIME, logVersion)) {

if (NameNodeLayoutVersion.supports(

LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {

this.atime = FSImageSerialization.readLong(in);

} else {

this.atime = readLong(in);

}

} else {

this.atime = 0;

}

// 读取数据块大小:如果支持读取数据块大小,从输入流读取一个long,赋值给blockSize

if (NameNodeLayoutVersion.supports(

LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {

this.blockSize = FSImageSerialization.readLong(in);

} else {

this.blockSize = readLong(in);

}

// 调用readBlocks()方法读取数据块,赋值给数据块数组blocks

this.blocks = readBlocks(in, logVersion);

// 从输入流读入权限,赋值给permissions

this.permissions = PermissionStatus.read(in);

// 如果是ADD操作,需要额外处理客户端名称clientName、客户端机器clientMachine、覆盖写标志overwrite等属性

if (this.opCode == OP_ADD) {

aclEntries = AclEditLogUtil.read(in, logVersion);

this.xAttrs = readXAttrsFromEditLog(in, logVersion);

this.clientName = FSImageSerialization.readString(in);

this.clientMachine = FSImageSerialization.readString(in);

if (NameNodeLayoutVersion.supports(

NameNodeLayoutVersion.Feature.CREATE_OVERWRITE, logVersion)) {

this.overwrite = FSImageSerialization.readBoolean(in);

} else {

this.overwrite = false;

}

if (NameNodeLayoutVersion.supports(

NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) {

this.storagePolicyId = FSImageSerialization.readByte(in);

} else {

this.storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;

}

// read clientId and callId

readRpcIds(in, logVersion);

} else {

this.clientName = "";

this.clientMachine = "";

}

}

这个没有什么特别好讲的,依次读入操作符需要的,在输入流中依次存在的属性即可。
不过,我们仍然需要重点讲解下读入数据块的readBlocks()方法,代码如下:

[java] view plain copy







private static Block[] readBlocks(

DataInputStream in,

int logVersion) throws IOException {

// 读取block数目numBlocks,占一个int

int numBlocks = in.readInt();

// 校验block数目numBlocks,应大于等于0,小于等于1024 * 1024 * 64

if (numBlocks < 0) {

throw new IOException("invalid negative number of blocks");

} else if (numBlocks > MAX_BLOCKS) {

throw new IOException("invalid number of blocks: " + numBlocks +

". The maximum number of blocks per file is " + MAX_BLOCKS);

}

// 构造block数组blocks,大小即为numBlocks

Block[] blocks = new Block[numBlocks];

// 从输入流中读取numBlocks个数据块

for (int i = 0; i < numBlocks; i++) {

// 构造数据块Block实例blk

Block blk = new Block();

// 调用Block的readFields()方法,从输入流读入数据块

blk.readFields(in);

// 将数据块blk放入数据块数组blocks

blocks[i] = blk;

}

// 返回数据块数组blocks

return blocks;

}

很简单,先从输入流读取block数目numBlocks,确定一共需要读取多少个数据块,然后构造block数组blocks,大小即为numBlocks,最后从输入流中读取numBlocks个数据块,每次都是先构造数据块Block实例blk,调用Block的readFields()方法,从输入流读入数据块,然后将数据块blk放入数据块数组blocks。全部数据块读取完毕后,返回数据块数组blocks。

我们再看下数据块Block的readFields()方法,如下:

[java] view plain copy







@Override // Writable

public void readFields(DataInput in) throws IOException {

readHelper(in);

}

继续看readHelper()方法,如下:

[java] view plain copy







final void readHelper(DataInput in) throws IOException {

// 从输入流读取一个long,作为数据块艾迪blockId

this.blockId = in.readLong();

// 从输入流读取一个long,作为数据块大小numBytes

this.numBytes = in.readLong();

// 从输入流读取一个long,作为数据块产生的时间戳generationStamp

this.generationStamp = in.readLong();

// 校验:数据块大小numBytes应大于等于0

if (numBytes < 0) {

throw new IOException("Unexpected block size: " + numBytes);

}

}

从输入流依次读入数据块艾迪blockId、数据块大小numBytes、数据块产生的时间戳generationStamp即可,三者均为long类型。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: