您的位置:首页 > 大数据 > Hadoop

HDFS源码分析数据块之CorruptReplicasMap

2016-04-13 13:47 489 查看
        CorruptReplicasMap用于存储文件系统中所有损坏数据块的信息。仅当它的所有副本损坏时一个数据块才被认定为损坏。当汇报数据块的副本时,我们隐藏所有损坏副本。一旦一个数据块被发现完好副本达到预期,它将从CorruptReplicasMap中被移除。

        我们先看下CorruptReplicasMap都有哪些成员变量,如下所示:

// 存储损坏数据块Block与它对应每个数据节点与损坏原因集合映射关系的集合
private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();        是的,你没看错,就这一个corruptReplicasMap集合,它是一个用来存储损坏数据块Block实例与它对应每个数据节点和损坏原因集合的映射关系的集合。
        我们看下CorruptReplicasMap都提供了哪些有用的方法。

        一、addToCorruptReplicasMap()

        标记属于指定数据节点的数据块为损坏 /**
* Mark the block belonging to datanode as corrupt.
* 标记属于指定数据节点的数据块为损坏
*
* @param blk Block to be added to CorruptReplicasMap
* @param dn DatanodeDescriptor which holds the corrupt replica
* @param reason a textual reason (for logging purposes)
* @param reasonCode the enum representation of the reason
*/
void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason, Reason reasonCode) {

// 先从corruptReplicasMap集合中查找是否存在对应数据块blk
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);

// 如果不存在,构造一个HashMap<DatanodeDescriptor, Reason>集合nodes,将blk与nodes存入corruptReplicasMap
if (nodes == null) {
nodes = new HashMap<DatanodeDescriptor, Reason>();
corruptReplicasMap.put(blk, nodes);
}

String reasonText;
if (reason != null) {
reasonText = " because " + reason;
} else {
reasonText = "";
}

// 判断nodes中是否存在对应数据节点dn,分别记录日志信息
if (!nodes.keySet().contains(dn)) {
NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
blk.getBlockName() +
" added as corrupt on " + dn +
" by " + Server.getRemoteIp() +
reasonText);
} else {

NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
"duplicate requested for " +
blk.getBlockName() + " to add as corrupt " +
"on " + dn +
" by " + Server.getRemoteIp() +
reasonText);
}

// Add the node or update the reason.
// 将数据节点dn、损坏原因编码reasonCode加入或更新入nodes
nodes.put(dn, reasonCode);
}        处理逻辑很简单,大体如下:
        1、先从corruptReplicasMap集合中查找是否存在对应数据块blk;

        2、如果不存在,构造一个HashMap<DatanodeDescriptor, Reason>集合nodes,将blk与nodes存入corruptReplicasMap;

        3、判断nodes中是否存在对应数据节点dn,分别记录日志信息;

        4、将数据节点dn、损坏原因编码reasonCode加入或更新入nodes。

        二、removeFromCorruptReplicasMap()

        将指定数据块、数据节点,根据指定原因从集合corruptReplicasMap移除 // 将指定数据块、数据节点,根据指定原因从集合corruptReplicasMap移除
boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
Reason reason) {

// 先从corruptReplicasMap集合中查找是否存在对应数据块blk,获得datanodes
Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);

// 如果不存在,直接返回false,表明移除失败
if (datanodes==null)
return false;

// if reasons can be compared but don't match, return false.

// 取出数据节点datanode对应的存储损坏原因storedReason
Reason storedReason = datanodes.get(datanode);

// 判断存储损坏原因storedReason与参数损坏原因reason是否一致,不一致直接返回false,表明移除失败,
// 判断的依据为参数损坏原因reason不是ANY且存储损坏原因storedReason不为空的情况下,两者不一致
if (reason != Reason.ANY && storedReason != null &&
reason != storedReason) {
return false;
}

// 将datanode对应数据从datanodes中移除
if (datanodes.remove(datanode) != null) { // remove the replicas

// 移除datanode后,如果datanodes为空
if (datanodes.isEmpty()) {
// remove the block if there is no more corrupted replicas
// 将数据块blk从集合corruptReplicasMap中移除
corruptReplicasMap.remove(blk);
}

// 返回true,表明移除成功
return true;
}

// 其他情况下直接返回false,表明移除失败
return false;
}        三、getCorruptReplicaBlockIds()
        获取指定大小和起始数据块ID的损坏数据块ID数组

/**
* Return a range of corrupt replica block ids. Up to numExpectedBlocks
* blocks starting at the next block after startingBlockId are returned
* (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
* is null, up to numExpectedBlocks blocks are returned from the beginning.
* If startingBlockId cannot be found, null is returned.
* 获取指定大小和起始数据块ID的损坏数据块ID数组
*
* @param numExpectedBlocks Number of block ids to return.
* 0 <= numExpectedBlocks <= 100
* @param startingBlockId Block id from which to start. If null, start at
* beginning.
* @return Up to numExpectedBlocks blocks from startingBlockId if it exists
*
*/
long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
Long startingBlockId) {

// 校验numExpectedBlocks,需要获取的数据块ID数组最多有100个元素
if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
return null;
}

// 获得corruptReplicasMap集合的数据块迭代器blockIt
Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator();

// if the starting block id was specified, iterate over keys until
// we find the matching block. If we find a matching block, break
// to leave the iterator on the next block after the specified block.

// 如果设定了起始数据块艾迪startingBlockId
if (startingBlockId != null) {
boolean isBlockFound = false;

// 遍历corruptReplicasMap,查看是否存在startingBlockId,如果存在,跳出循环,此时已记录住迭代器的位置了
while (blockIt.hasNext()) {
Block b = blockIt.next();
if (b.getBlockId() == startingBlockId) {
isBlockFound = true;
break;
}
}

// 如果不存在,直接返回null
if (!isBlockFound) {
return null;
}
}

// 构造一个存储数据块ID的列表corruptReplicaBlockIds
ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();

// append up to numExpectedBlocks blockIds to our list

// 遍历corruptReplicasMap,将最多numExpectedBlocks个数据块ID添加到列表corruptReplicaBlockIds,
// 此时的迭代器可能不是从头开始取数据的,在startingBlockId需要并存在的情况下,它是从下一个元素开始获取的
for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
corruptReplicaBlockIds.add(blockIt.next().getBlockId());
}

// 将数据块ID列表corruptReplicaBlockIds转换成数组ret
long[] ret = new long[corruptReplicaBlockIds.size()];
for(int i=0; i<ret.length; i++) {
ret[i] = corruptReplicaBlockIds.get(i);
}

// 返回数据块ID数组ret
return ret;
}        四、getNodes()
        根据损坏数据块获取对应数据节点集合
/**
* Get Nodes which have corrupt replicas of Block
* 根据损坏数据块获取对应数据节点集合
*
* @param blk Block for which nodes are requested
* @return collection of nodes. Null if does not exists
*/
Collection<DatanodeDescriptor> getNodes(Block blk) {
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
if (nodes == null)
return null;
return nodes.keySet();
}        五、isReplicaCorrupt()
        检测指定数据块和数据节点是否为损坏的

/**
* Check if replica belonging to Datanode is corrupt
* 检测指定数据块和数据节点是否为损坏的
*
* @param blk Block to check
* @param node DatanodeDescriptor which holds the replica
* @return true if replica is corrupt, false if does not exists in this map
*/
boolean isReplicaCorrupt(Block blk, DatanodeDescriptor node) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
return ((nodes != null) && (nodes.contains(node)));
}        六、numCorruptReplicas()
        获取给定数据块对应数据节点数量

// 获取给定数据块对应数据节点数量
int numCorruptReplicas(Block blk) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
return (nodes == null) ? 0 : nodes.size();
}        七、size()
        获取损坏数据块数量
// 获取损坏数据块数量
int size() {
return corruptReplicasMap.size();
}

        
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: