您的位置:首页 > 大数据 > Hadoop

HDFS读写数据块--${dfs.data.dir}选择策略

2014-11-26 18:27 645 查看
  最近工作需要,看了HDFS读写数据块这部分。不过可能跟网上大部分帖子不一样,本文主要写了${dfs.data.dir}的选择策略,也就是block在DataNode上的放置策略。我主要是从我们工作需要的角度来读这部分代码的。  

hdfs-site.xml
<property>
<name>dfs.data.dir</name>
<value>/mnt/datadir1/data,/mnt/datadir2/data,/mnt/datadir3/data</value>
</property>


  所谓${dfs.data.dir}的选择策略,就是当DataNode配置有多个${dfs.data.dir}目录时(如上面的配置),该选择哪个目录来存放block。一般多个硬盘分别挂载到不同的${dfs.data.dir}下,所以存储block是要决定block该放到哪个磁盘上。

  创建文件总共有两步:

  1、在写block之前,需要与NameNode通信来生成文件(INodeFile、INodeFileUnderConstruction)。首先在DFSClient端的create()方法中发起写请求,然后通过RPC由NameNode最终调用FSNameSystem的startFileInternal()方法来创建文件。

/**
* The client would like to obtain an additional block for the indicated
* filename (which is being written-to).  Return an array that consists
* of the block, plus a set of machines.  The first on this list should
* be where the client writes data.  Subsequent items in the list must
* be provided in the connection to the first datanode.
*
* Make sure the previous blocks have been reported by datanodes and
* are replicated.  Will return an empty 2-elt array if we want the
* client to "try again later".
*/
//向NameNode申请block
public LocatedBlock getAdditionalBlock(String src,
String clientName,
HashMap<Node, Node> excludedNodes
) throws IOException {
long fileLength, blockSize;
int replication;
DatanodeDescriptor clientNode = null;
Block newBlock = null;

NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "
+src+" for "+clientName);

synchronized (this) {
if (isInSafeMode()) {//check safemode first for failing-fast
throw new SafeModeException("Cannot add block to " + src, safeMode);
}
// have we exceeded the configured limit of fs objects.
checkFsObjectLimit();

INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);

//
// If we fail this, bad things happen!
//
if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet:" + src);
}
fileLength = pendingFile.computeContentSummary().getLength();
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = (int)pendingFile.getReplication();
}

// choose targets for the new block to be allocated.
//选择副本存放的位置
DatanodeDescriptor targets[] = replicator.chooseTarget(src,
replication,
clientNode,
excludedNodes,
blockSize);
if (targets.length < this.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
minReplication);
}

// Allocate a new block and record it in the INode.
synchronized (this) {
if (isInSafeMode()) { //make sure it is not in safemode again.
throw new SafeModeException("Cannot add block to " + src, safeMode);
}
INode[] pathINodes = dir.getExistingPathINodes(src);
int inodesLen = pathINodes.length;
checkLease(src, clientName, pathINodes[inodesLen-1]);
INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction)
pathINodes[inodesLen - 1];

if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet:" + src);
}

// allocate new block record block locations in INode.
//分配block,并随机生成一个不重复的blockID,然后在INode中记录该block
newBlock = allocateBlock(src, pathINodes);
pendingFile.setTargets(targets);

for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}
dir.persistBlocks(src, pendingFile);
}
if (persistBlocks) {
getEditLog().logSync();
}

// Create next block
LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
if (isAccessTokenEnabled) {
b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
return b;
}


getAdditionalBlock
  上面的方法还涉及到了块的选择策略,这个留在下一篇再说。下面这个图来总结下上面方法的调用层次:



  最后重点说一下block在DataNode上的存储策略。其调度层次如下:



  首先说一下其中涉及到的数据结构:

class FSVolume {    //卷信息,代表${dfs.data.dir}
private File currentDir;      //存放block,即${dfs.data.dir}/current
private FSDir dataDir;        //表示currentDir有哪些块文件
private File tmpDir;          //存放一些临时文件,即${dfs.data.dir}/tmp
private File blocksBeingWritten;    //放置正在写的block,即${dfs.data.dir}/ blocksBeingWritten
private File detachDir;       //是否写分离,即${dfs.data.dir}/detach
private DF usage;
private DU dfsUsage;
private long reserved;


static class FSVolumeSet {  //卷信息集合,代表多个${dfs.data.dir}
FSVolume[] volumes = null;    //代表多个FSVolume,并将其组织成一个数组
int curVolume = 0;            //指示当前正在使用哪一个FSVolume


  FSVolumeSet 代表多个${dfs.data.dir}目录的集合,它将这些目录组织成一个数组volumes,然后用curVolume来指示当前正在使用的是哪个${dfs.data.dir}目录。${dfs.data.dir}的选择策略如下:

  当有多个${dfs.data.dir}时,DataNode顺序地从volumes选择一个FSVolume用来存放block(先放在blocksBeingWritten目录下,写完后再转移到current目录下);

  每次写完一个block, curVolume增1。以此实现多个${dfs.data.dir}目录的轮流写。该策略在FSDataSet.FSVolumeSet的getNextVolume()方法中实现。

synchronized FSVolume getNextVolume(long blockSize) throws IOException {

if(volumes.length < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}

// since volumes could've been removed because of the failure
// make sure we are not out of bounds
if(curVolume >= volumes.length) {
curVolume = 0;
}

int startVolume = curVolume;

while (true) {
FSVolume volume = volumes[curVolume];
curVolume = (curVolume + 1) % volumes.length;    //增1
if (volume.getAvailable() > blockSize) { return volume; }
if (curVolume == startVolume) {
throw new DiskOutOfSpaceException("Insufficient space for an additional block");
}
}
}


  接着来说一下读block的过程。在Map Task执行时,nextKeyValue()方法来从block中读取数据,主要步骤如下:

  1、根据创建Map Task时指定的文件偏移量和长度,来确定应该读取哪个block,并获取这个block的详细信息。(与NameNode有一次通信)。

  2、根据block所在的DataNode,选择一个最好的DN,并建立与该DN的socket连接(默认不启用本地读)。

  其方法的调用层次如下:



  Map Task读取数据是由RecordReader类来完成的。它是个接口,有两个子类:

BlockReaderLocal:读取本地block(不通过DataNode)

RemoteBlockReader:读取远程block(通过DataNode)

  Map Task在读取数据时,即使是本地数据也是使用RemoteBlockReader来读的,也就是通过socket,默认不开启本地读。通过这个链接的方法可以开启本地读(http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html),也就是使用BlockReaderLocal直接来从本地读block,而不通过DataNode。以下的分析都是基于BlockReaderLocal来完成的。

先说一下涉及到的数据结构:

public class BlockLocalPathInfo implements Writable {    //用来描述block的位置信息

private Block block;                  //特定的块文件
private String localBlockPath = "";   //块文件的本地存储路径
private String localMetaPath = "";    //块校验文件的本地存储路径


//Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo {    //代表本机上的某个DataNode(一个机器上可能运行多个DataNode)
private final Map<Block, BlockLocalPathInfo> cache;    //其中维护的表(block-->block位置信息)


// Multiple datanodes could be running on the local machine. Store proxies in
// a map keyed by the ipc port of the datanode.
//BlockReaderLocal中维护的表:
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
// Integer:表示端口号
//  LocalDatanodeInfo:表示某个DataNode


/**
* This class is used by the datanode to maintain the map from a block
* to its metadata.
*/
class DatanodeBlockInfo {    //表示该DN上的所有block信息(block-->block元信息)

private FSVolume volume;       //block所在的FSVolume
private File     file;         // block file
private boolean detached;      // block的写复制是否完成


//block与block元信息映射表
HashMap<Block,DatanodeBlockInfo> volumeMap = new HashMap<Block, DatanodeBlockInfo>();;


  在读block时,首先根据localDatanodeInfoMap确定要访问的DataNode;然后从volumeMap中找到block对应的DatanodeBlockInfo信息(这其中就包括block对应的FSVolume,这是在存储block时确定的。本文前边有写);然后根据DatanodeBlockInfo来构造BlockLocalPathInfo对象,将block的相关信息存放到BlockLocalPathInfo对象中。最后BlockReaderLocal根据BlockLocalPathInfo对象来读取相应的block。 具体在BlockReaderLocal.newBlockReader()方法中。

  本文基于hadoop1.2.1

  如有错误,还请指正

  转载请注明出处:/article/5582223.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: