HDFS读写数据块--${dfs.data.dir}选择策略
2014-11-26 18:27
645 查看
最近工作需要,看了HDFS读写数据块这部分。不过可能跟网上大部分帖子不一样,本文主要写了${dfs.data.dir}的选择策略,也就是block在DataNode上的放置策略。我主要是从我们工作需要的角度来读这部分代码的。
所谓${dfs.data.dir}的选择策略,就是当DataNode配置有多个${dfs.data.dir}目录时(如上面的配置),该选择哪个目录来存放block。一般多个硬盘分别挂载到不同的${dfs.data.dir}下,所以存储block是要决定block该放到哪个磁盘上。
创建文件总共有两步:
1、在写block之前,需要与NameNode通信来生成文件(INodeFile、INodeFileUnderConstruction)。首先在DFSClient端的create()方法中发起写请求,然后通过RPC由NameNode最终调用FSNameSystem的startFileInternal()方法来创建文件。
getAdditionalBlock
上面的方法还涉及到了块的选择策略,这个留在下一篇再说。下面这个图来总结下上面方法的调用层次:
最后重点说一下block在DataNode上的存储策略。其调度层次如下:
首先说一下其中涉及到的数据结构:
FSVolumeSet 代表多个${dfs.data.dir}目录的集合,它将这些目录组织成一个数组volumes,然后用curVolume来指示当前正在使用的是哪个${dfs.data.dir}目录。${dfs.data.dir}的选择策略如下:
当有多个${dfs.data.dir}时,DataNode顺序地从volumes选择一个FSVolume用来存放block(先放在blocksBeingWritten目录下,写完后再转移到current目录下);
每次写完一个block, curVolume增1。以此实现多个${dfs.data.dir}目录的轮流写。该策略在FSDataSet.FSVolumeSet的getNextVolume()方法中实现。
接着来说一下读block的过程。在Map Task执行时,nextKeyValue()方法来从block中读取数据,主要步骤如下:
1、根据创建Map Task时指定的文件偏移量和长度,来确定应该读取哪个block,并获取这个block的详细信息。(与NameNode有一次通信)。
2、根据block所在的DataNode,选择一个最好的DN,并建立与该DN的socket连接(默认不启用本地读)。
其方法的调用层次如下:
Map Task读取数据是由RecordReader类来完成的。它是个接口,有两个子类:
BlockReaderLocal:读取本地block(不通过DataNode)
RemoteBlockReader:读取远程block(通过DataNode)
Map Task在读取数据时,即使是本地数据也是使用RemoteBlockReader来读的,也就是通过socket,默认不开启本地读。通过这个链接的方法可以开启本地读(http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html),也就是使用BlockReaderLocal直接来从本地读block,而不通过DataNode。以下的分析都是基于BlockReaderLocal来完成的。
先说一下涉及到的数据结构:
在读block时,首先根据localDatanodeInfoMap确定要访问的DataNode;然后从volumeMap中找到block对应的DatanodeBlockInfo信息(这其中就包括block对应的FSVolume,这是在存储block时确定的。本文前边有写);然后根据DatanodeBlockInfo来构造BlockLocalPathInfo对象,将block的相关信息存放到BlockLocalPathInfo对象中。最后BlockReaderLocal根据BlockLocalPathInfo对象来读取相应的block。 具体在BlockReaderLocal.newBlockReader()方法中。
本文基于hadoop1.2.1
如有错误,还请指正
转载请注明出处:/article/5582223.html
hdfs-site.xml <property> <name>dfs.data.dir</name> <value>/mnt/datadir1/data,/mnt/datadir2/data,/mnt/datadir3/data</value> </property>
所谓${dfs.data.dir}的选择策略,就是当DataNode配置有多个${dfs.data.dir}目录时(如上面的配置),该选择哪个目录来存放block。一般多个硬盘分别挂载到不同的${dfs.data.dir}下,所以存储block是要决定block该放到哪个磁盘上。
创建文件总共有两步:
1、在写block之前,需要与NameNode通信来生成文件(INodeFile、INodeFileUnderConstruction)。首先在DFSClient端的create()方法中发起写请求,然后通过RPC由NameNode最终调用FSNameSystem的startFileInternal()方法来创建文件。
/** * The client would like to obtain an additional block for the indicated * filename (which is being written-to). Return an array that consists * of the block, plus a set of machines. The first on this list should * be where the client writes data. Subsequent items in the list must * be provided in the connection to the first datanode. * * Make sure the previous blocks have been reported by datanodes and * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ //向NameNode申请block public LocatedBlock getAdditionalBlock(String src, String clientName, HashMap<Node, Node> excludedNodes ) throws IOException { long fileLength, blockSize; int replication; DatanodeDescriptor clientNode = null; Block newBlock = null; NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: " +src+" for "+clientName); synchronized (this) { if (isInSafeMode()) {//check safemode first for failing-fast throw new SafeModeException("Cannot add block to " + src, safeMode); } // have we exceeded the configured limit of fs objects. checkFsObjectLimit(); INodeFileUnderConstruction pendingFile = checkLease(src, clientName); // // If we fail this, bad things happen! // if (!checkFileProgress(pendingFile, false)) { throw new NotReplicatedYetException("Not replicated yet:" + src); } fileLength = pendingFile.computeContentSummary().getLength(); blockSize = pendingFile.getPreferredBlockSize(); clientNode = pendingFile.getClientNode(); replication = (int)pendingFile.getReplication(); } // choose targets for the new block to be allocated. //选择副本存放的位置 DatanodeDescriptor targets[] = replicator.chooseTarget(src, replication, clientNode, excludedNodes, blockSize); if (targets.length < this.minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes, instead of " + minReplication); } // Allocate a new block and record it in the INode. synchronized (this) { if (isInSafeMode()) { //make sure it is not in safemode again. throw new SafeModeException("Cannot add block to " + src, safeMode); } INode[] pathINodes = dir.getExistingPathINodes(src); int inodesLen = pathINodes.length; checkLease(src, clientName, pathINodes[inodesLen-1]); INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) pathINodes[inodesLen - 1]; if (!checkFileProgress(pendingFile, false)) { throw new NotReplicatedYetException("Not replicated yet:" + src); } // allocate new block record block locations in INode. //分配block,并随机生成一个不重复的blockID,然后在INode中记录该block newBlock = allocateBlock(src, pathINodes); pendingFile.setTargets(targets); for (DatanodeDescriptor dn : targets) { dn.incBlocksScheduled(); } dir.persistBlocks(src, pendingFile); } if (persistBlocks) { getEditLog().logSync(); } // Create next block LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength); if (isAccessTokenEnabled) { b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE))); } return b; }
getAdditionalBlock
上面的方法还涉及到了块的选择策略,这个留在下一篇再说。下面这个图来总结下上面方法的调用层次:
最后重点说一下block在DataNode上的存储策略。其调度层次如下:
首先说一下其中涉及到的数据结构:
class FSVolume { //卷信息,代表${dfs.data.dir} private File currentDir; //存放block,即${dfs.data.dir}/current private FSDir dataDir; //表示currentDir有哪些块文件 private File tmpDir; //存放一些临时文件,即${dfs.data.dir}/tmp private File blocksBeingWritten; //放置正在写的block,即${dfs.data.dir}/ blocksBeingWritten private File detachDir; //是否写分离,即${dfs.data.dir}/detach private DF usage; private DU dfsUsage; private long reserved;
static class FSVolumeSet { //卷信息集合,代表多个${dfs.data.dir} FSVolume[] volumes = null; //代表多个FSVolume,并将其组织成一个数组 int curVolume = 0; //指示当前正在使用哪一个FSVolume
FSVolumeSet 代表多个${dfs.data.dir}目录的集合,它将这些目录组织成一个数组volumes,然后用curVolume来指示当前正在使用的是哪个${dfs.data.dir}目录。${dfs.data.dir}的选择策略如下:
当有多个${dfs.data.dir}时,DataNode顺序地从volumes选择一个FSVolume用来存放block(先放在blocksBeingWritten目录下,写完后再转移到current目录下);
每次写完一个block, curVolume增1。以此实现多个${dfs.data.dir}目录的轮流写。该策略在FSDataSet.FSVolumeSet的getNextVolume()方法中实现。
synchronized FSVolume getNextVolume(long blockSize) throws IOException { if(volumes.length < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } // since volumes could've been removed because of the failure // make sure we are not out of bounds if(curVolume >= volumes.length) { curVolume = 0; } int startVolume = curVolume; while (true) { FSVolume volume = volumes[curVolume]; curVolume = (curVolume + 1) % volumes.length; //增1 if (volume.getAvailable() > blockSize) { return volume; } if (curVolume == startVolume) { throw new DiskOutOfSpaceException("Insufficient space for an additional block"); } } }
接着来说一下读block的过程。在Map Task执行时,nextKeyValue()方法来从block中读取数据,主要步骤如下:
1、根据创建Map Task时指定的文件偏移量和长度,来确定应该读取哪个block,并获取这个block的详细信息。(与NameNode有一次通信)。
2、根据block所在的DataNode,选择一个最好的DN,并建立与该DN的socket连接(默认不启用本地读)。
其方法的调用层次如下:
Map Task读取数据是由RecordReader类来完成的。它是个接口,有两个子类:
BlockReaderLocal:读取本地block(不通过DataNode)
RemoteBlockReader:读取远程block(通过DataNode)
Map Task在读取数据时,即使是本地数据也是使用RemoteBlockReader来读的,也就是通过socket,默认不开启本地读。通过这个链接的方法可以开启本地读(http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html),也就是使用BlockReaderLocal直接来从本地读block,而不通过DataNode。以下的分析都是基于BlockReaderLocal来完成的。
先说一下涉及到的数据结构:
public class BlockLocalPathInfo implements Writable { //用来描述block的位置信息 private Block block; //特定的块文件 private String localBlockPath = ""; //块文件的本地存储路径 private String localMetaPath = ""; //块校验文件的本地存储路径
//Stores the cache and proxy for a local datanode. private static class LocalDatanodeInfo { //代表本机上的某个DataNode(一个机器上可能运行多个DataNode) private final Map<Block, BlockLocalPathInfo> cache; //其中维护的表(block-->block位置信息)
// Multiple datanodes could be running on the local machine. Store proxies in // a map keyed by the ipc port of the datanode. //BlockReaderLocal中维护的表: private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>(); // Integer:表示端口号 // LocalDatanodeInfo:表示某个DataNode
/** * This class is used by the datanode to maintain the map from a block * to its metadata. */ class DatanodeBlockInfo { //表示该DN上的所有block信息(block-->block元信息) private FSVolume volume; //block所在的FSVolume private File file; // block file private boolean detached; // block的写复制是否完成
//block与block元信息映射表 HashMap<Block,DatanodeBlockInfo> volumeMap = new HashMap<Block, DatanodeBlockInfo>();;
在读block时,首先根据localDatanodeInfoMap确定要访问的DataNode;然后从volumeMap中找到block对应的DatanodeBlockInfo信息(这其中就包括block对应的FSVolume,这是在存储block时确定的。本文前边有写);然后根据DatanodeBlockInfo来构造BlockLocalPathInfo对象,将block的相关信息存放到BlockLocalPathInfo对象中。最后BlockReaderLocal根据BlockLocalPathInfo对象来读取相应的block。 具体在BlockReaderLocal.newBlockReader()方法中。
本文基于hadoop1.2.1
如有错误,还请指正
转载请注明出处:/article/5582223.html
相关文章推荐
- ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: All directories in dfs.data.dir are invalid
- org.apache.hadoop.hdfs.server.datanode.DataNode: Invalid dfs.datanode.data.dir /chunk : java.io.Fil
- hdfs硬盘中dfs.data.dir相关和一些说明
- hdfs dfsadmin -reconfig reload datanode dfs.datanode.data.dir without restart datanode
- dfs.name.dir与dfs.data.dir
- hadoop hdfs 读写错误解决:java.io.IOException: Filesystem closed org.apache.hadoop.hdfs.DFSClient.checkOpe
- HDFS-datanode数据块部分笔记
- Hadoop Datanode节点无法启动(All directories in dfs.data.dir are invalid)
- Hadoop-2.4.1源码分析--HDFS HeartBeat(心跳检测)之DataNode端数据块增量汇报
- hadoop datanode启动失败(All directories in dfs.data.dir are invalid)
- Hadoop HDFS (3) JAVA访问HDFS之二 文件分布式读写策略
- HDFS读取副本的选择策略
- HDFS读取副本的选择策略
- 关于hadoop中datanode节点不同的dfs.data.dir之间数据均衡问题
- ${mapred.local.dir}选择策略--Map Task存放中间结果
- hadoop datanode启动失败(All directories in dfs.data.dir are invalid)
- Hadoop Datanode节点无法启动(All directories in dfs.data.dir are invalid)
- 聊聊HDFS中的副本放置策略和磁盘选择策略间的选择“矛盾”
- HDFS 副本存放磁盘选择策略详解
- hdfs haadmin使用,DataNode动态上下线,NameNode状态切换管理,数据块的balance,HA下hdfs-api变化(来自学习资料)