HDFS(2)
2015-12-08 19:06
357 查看
HDFS中读文件过程,如上图所示。
1.通过fs(filesystem)的open方法读取文件。因为这里的系统的分布式系统,
所以完成open方法的类是DistributedFileSystem,查看open方法。
@Override public FSDataInputStream open(Path f, final int bufferSize) throws IOException { statistics.incrementReadOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataInputStream>() { @Override public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException { final DFSInputStream dfsis = dfs.open(getPathName(p), bufferSize, verifyChecksum); return dfs.createWrappedInputStream(dfsis); } @Override public FSDataInputStream next(final FileSystem fs, final Path p) throws IOException { return fs.open(p, bufferSize); } }.resolve(this, absF); }
这里返回一个FSDataInputStream。具体如何返回,这里调用了resolve方法,这个方法会调用docall方法,如果docall方法有异常则调用next方法。docall方法主要就执行了一句话,使用DFSClient的open方法,这个方法返回一个DFSInputStream,这个输入流会从nn中得到一个nodelist。因为nn保存了2个重要的表,(1)filename->blocksequence (2)block->machinelist。nn通过filename找到对应block块序列,这些block块都会存在在dn上面,其实就是一组block的地址信息。
/** * Create an input stream that obtains a nodelist from the * namenode, and then reads from all the right places. Creates * inner subclass of InputStream that does the right out-of-band * work. */ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) throws IOException, UnresolvedLinkException { checkOpen(); // Get block info from namenode return new DFSInputStream(this, src, buffersize, verifyChecksum); }
2.进入DFSClient的open方法,主要是创建一个DFSInputStream。该方法的构造方法也没什么好看的,主要有一个openInfo方法,实现了从nn中获取文件信息的功能。
/** * Read the entire buffer. */ @Override public synchronized int read(final byte buf[], int off, int len) throws IOException { ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); return readWithStrategy(byteArrayReader, off, len); }
3.仔细学习DFSInputStream这个类,创建一个读策略接口的实现,接口只有一个方法doRead。采用的策略是读字节数组。
private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { dfsClient.checkOpen(); if (closed) { throw new IOException("Stream closed"); } Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<ExtendedBlock, Set<DatanodeInfo>>(); failures = 0; if (pos < getFileLength()) { int retries = 2; while (retries > 0) { try { // currentNode can be left as null if previous read had a checksum // error on the same block. See HDFS-3067 if (pos > blockEnd || currentNode == null) { currentNode = blockSeekTo(pos); } int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); if (locatedBlocks.isLastBlockComplete()) { realLen = (int) Math.min(realLen, locatedBlocks.getFileLength()); } int result = readBuffer(strategy, off, realLen, corruptedBlockMap); if (result >= 0) { pos += result; } else { // got a EOS from reader though we expect more data on it. throw new IOException("Unexpected EOS from the reader"); } if (dfsClient.stats != null) { dfsClient.stats.incrementBytesRead(result); } return result; } catch (ChecksumException ce) { throw ce; } catch (IOException e) { if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } blockEnd = -1; if (currentNode != null) { addToDeadNodes(currentNode); } if (--retries == 0) { throw e; } } finally { // Check if need to report block replicas corruption either read // was successful or ChecksumException occured. reportCheckSumFailure(corruptedBlockMap, currentLocatedBlock.getLocations().length); } } } return -1; }
4.看readWithStrategy方法,首先会移动到第一个block所在的地址,具体的移动过程在blockSeekTo方法中。
/** * Open a DataInputStream to a DataNode so that it can be read from. * We get block ID and the IDs of the destinations at startup, from the namenode. */ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { if (target >= getFileLength()) { throw new IOException("Attempted to read past end of file"); } // Will be getting a new BlockReader. if (blockReader != null) { blockReader.close(); blockReader = null; } // // Connect to best DataNode for desired Block, with potential offset // DatanodeInfo chosenNode = null; int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once boolean connectFailedOnce = false; while (true) { // // Compute desired block // LocatedBlock targetBlock = getBlockAt(target, true); assert (target==pos) : "Wrong postion " + pos + " expect " + target; long offsetIntoBlock = target - targetBlock.getStartOffset(); DNAddrPair retval = chooseDataNode(targetBlock, null); chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; StorageType storageType = retval.storageType; try { ExtendedBlock blk = targetBlock.getBlock(); Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); blockReader = new BlockReaderFactory(dfsClient.getConf()). setInetSocketAddress(targetAddr). setRemotePeerFactory(dfsClient). setDatanodeInfo(chosenNode). setStorageType(storageType). setFileName(src). setBlock(blk). setBlockToken(accessToken). setStartOffset(offsetIntoBlock). setVerifyChecksum(verifyChecksum). setClientName(dfsClient.clientName). setLength(blk.getNumBytes() - offsetIntoBlock). setCachingStrategy(cachingStrategy). setAllowShortCircuitLocalReads(!shortCircuitForbidden()). setClientCacheContext(dfsClient.getClientContext()). setUserGroupInformation(dfsClient.ugi). setConfiguration(dfsClient.getConfiguration()). build(); if(connectFailedOnce) { DFSClient.LOG.info("Successfully connected to " + targetAddr + " for " + blk); } return chosenNode; } catch (IOException ex) { if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr + " : " + ex); // The encryption key used is invalid. refetchEncryptionKey--; dfsClient.clearDataEncryptionKey(); } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { refetchToken--; fetchBlockAt(target); } else { connectFailedOnce = true; DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" + ", add to deadNodes and continue. " + ex, ex); // Put chosen node into dead list, continue addToDeadNodes(chosenNode); } } } }5.整个移动的过程是为了找到最好的block,因为副本机制,同样的block存放在不同dn上面,找到最好的block就是为了找到距离客户端距离最近的可用的dn上面的block。并且,该方法还利用工厂设计模式根据要求创建一个blockReader对象。移动到最好的dn之后,程序开始读内容,具体的读操作在readBuffer方法中。
/* This is a used by regular read() and handles ChecksumExceptions. * name readBuffer() is chosen to imply similarity to readBuffer() in * ChecksumFileSystem */ private synchronized int readBuffer(ReaderStrategy reader, int off, int len, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { IOException ioe; /* we retry current node only once. So this is set to true only here. * Intention is to handle one common case of an error that is not a * failure on datanode or client : when DataNode closes the connection * since client is idle. If there are other cases of "non-errors" then * then a datanode might be retried by setting this to true again. */ boolean retryCurrentNode = true; while (true) { // retry as many times as seekToNewSource allows. try { return reader.doRead(blockReader, off, len, readStatistics); } catch ( ChecksumException ce ) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode + " at " + ce.getPos()); ioe = ce; retryCurrentNode = false; // we want to remember which block replicas we have tried addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, corruptedBlockMap); } catch ( IOException e ) { if (!retryCurrentNode) { DFSClient.LOG.warn("Exception while reading from " + getCurrentBlock() + " of " + src + " from " + currentNode, e); } ioe = e; } boolean sourceFound = false; if (retryCurrentNode) { /* possibly retry the same node so that transient errors don't * result in application level failures (e.g. Datanode could have * closed the connection because the client is idle for too long). */ sourceFound = seekToBlockSource(pos); } else { addToDeadNodes(currentNode); sourceFound = seekToNewSource(pos); } if (!sourceFound) { throw ioe; } retryCurrentNode = false; } }6.该方法会执行多次的读策略中的doRead方法。
相关文章推荐
- 从HDFS读入图片并处理
- 整合storm-hdfs过程中源码学习
- 上传本地文件到hdfs
- hdfs的常用命令
- HDFS 上传文件的不平衡,Balancer问题是过慢
- HDFS(1)
- hdfs 机架感知和复制因子的设置
- Avro技术应用_5. 利用 Camus 来将 Avro 数据从 Kafka 拷贝到 HDFS -- 待完善
- 从本地上传到hdfs上出现异常
- HDFS的命令行操作
- hadoop 优化之container
- HDFS小文件处理解决方案总结+facebook(HayStack) + 淘宝(TFS)
- HDFS TFS
- hdfs文件操作操作示例,包括上传文件到HDFS上、从HDFS上下载文件和删除HDFS上的文件
- Hadoop之yarn和mapreduce
- hdfs配置文件详解(转载)
- eclipse远程连接hadoop-笔记2
- flume 收集日志到HDFS
- HDFS操作
- eclipse远程连接hadoop-笔记