HDFS(3)
2015-12-10 16:07
330 查看
1.继续研究HDFS写操作的源码过程,客户端通过DistributedFileststem中的create方法创建文件。结构与上一篇读文件的操作类似,会执行FileSystemLinkResolver的resolve方法,即调用docall方法。
public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException { final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics); }2.docall方法主要操作是客户端实例dfs调用自身的create方法。具体的create方法如下
/** * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is * a hint to where the namenode should place the file blocks. * The favored nodes hint is not persisted in HDFS. Hence it may be honored * at the creation time only. HDFS could move the blocks during balancing or * replication, to move the blocks from favored nodes. A value of null means * no favored nodes for this create */ public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); } FsPermission masked = permission.applyUMask(dfsClientConf.uMask); if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } String[] favoredNodeStrs = null; if (favoredNodes != null) { favoredNodeStrs = new String[favoredNodes.length]; for (int i = 0; i < favoredNodes.length; i++) { favoredNodeStrs[i] = favoredNodes[i].getHostName() + ":" + favoredNodes[i].getPort(); } } final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs); beginFileLease(result.getFileId(), result); return result; }3.这个create主要是会创建一个favoredNodes,表示nn应该在哪里放置文件的block,主要关注里面的newStreamForCreate方法。
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException { HdfsFileStatus stat = null; // Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( AccessControlException.class, DSQuotaExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class); if (e instanceof RetryStartFileException) { if (retryCount > 0) { shouldRetry = true; retryCount--; } else { throw new IOException("Too many retries because of encryption" + " zone operations", e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); out.start(); return out; }4.在这个方法中,主要关注最下面返回值DFSOutputStream部分,看到了start方法,立马想到了线程,于是在DFSOutputStream中到run方法。
<span style="font-size:18px;">/* * streamer thread is the only thread that opens streams to datanode, * and closes them. Any error recovery is also done by this thread. */ @Override </span><span style="font-size:12px;">public void run() { long lastPacket = Time.now(); TraceScope traceScope = null; if (traceSpan != null) { traceScope = Trace.continueSpan(traceSpan); } while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder if (hasError && response != null) { try { response.close(); response.join(); response = null; } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } Packet one; try { // process datanode IO errors if any boolean doSleep = false; if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) { doSleep = processDatanodeError(); } synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.now(); while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING)? timeout : 1000; try { dataQueue.wait(timeout); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.now(); } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // get packet to be sent. if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); } else { one = dataQueue.getFirst(); // regular data packet } } assert one != null; // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Allocating new block"); } setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); initDataStreaming(); } long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > blockSize) { throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src); } if (one.lastPacketInBlock) { // wait for all data packets have been successfully acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { try { // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; } // send the packet synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { dataQueue.removeFirst(); ackQueue.addLast(one); dataQueue.notifyAll(); } } if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DataStreamer block " + block + " sending packet " + one); } // write out data to remote datanode try { one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. tryMarkPrimaryDatanodeFailed(); throw e; } lastPacket = Time.now(); // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) { bytesSent = tmpBytesSent; } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // Is this block full? if (one.lastPacketInBlock) { // wait for the close packet has been acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { dataQueue.wait(1000);// wait for acks to arrive from datanodes } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } endBlock(); } if (progress != null) { progress.progress(); } // This is used by unit test to trigger race conditions. if (artificialSlowdown != 0 && dfsClient.clientRunning) { Thread.sleep(artificialSlowdown); } } catch (Throwable e) { // Log warning if there was a real error. if (restartingNodeIndex == -1) { DFSClient.LOG.warn("DataStreamer Exception", e); } if (e instanceof IOException) { setLastException((IOException)e); } else { setLastException(new IOException("DataStreamer Exception: ",e)); } hasError = true; if (errorIndex == -1 && restartingNodeIndex == -1) { // Not a datanode issue streamerClosed = true; } } } if (traceScope != null) { traceScope.close(); } closeInternal(); }</span>5.流线程负责向dn写数据。这段代码看的不是非常明白,整个代码是由几个锁组成的,首先数据队列会等待发送过来的完整的一个数据包;接着因为副本机制,所以需要从nn获得下一个block的信息,并且加入到管道中;这里数据队列会等带所有的数据包传输完毕,意思就是管道中的所有的节点都存储了数据包之后,会开始进行确认队列;确认队列就是将数据包从数据队列移动到确认队列中。
相关文章推荐
- HDFS追本溯源:HDFS操作的逻辑流程与源码解析
- HDFS 断点续传,写文件功能
- HDFS(2)
- 从HDFS读入图片并处理
- 整合storm-hdfs过程中源码学习
- 上传本地文件到hdfs
- hdfs的常用命令
- HDFS 上传文件的不平衡,Balancer问题是过慢
- HDFS(1)
- hdfs 机架感知和复制因子的设置
- Avro技术应用_5. 利用 Camus 来将 Avro 数据从 Kafka 拷贝到 HDFS -- 待完善
- 从本地上传到hdfs上出现异常
- HDFS的命令行操作
- hadoop 优化之container
- HDFS小文件处理解决方案总结+facebook(HayStack) + 淘宝(TFS)
- HDFS TFS
- hdfs文件操作操作示例,包括上传文件到HDFS上、从HDFS上下载文件和删除HDFS上的文件
- Hadoop之yarn和mapreduce
- hdfs配置文件详解(转载)
- eclipse远程连接hadoop-笔记2