您的位置:首页 > 大数据 > Hadoop

通过源码了解hdfs客户端写文件流程

2018-01-01 19:06 344 查看
之前梳理了一下hdfs客户端的简单流程,但为了跟深刻的了解,所以尝试通过源码了解一次。
先准备demo代码,将断点打在fs.copyFromLocalFile()。
public class HdfsClientDemo {
FileSystem fs = null;
Configuration conf = null;
@Before
public void init() throws Exception{

conf = new Configuration();
//可以直接传入 uri和用户身份
fs = FileSystem.get(new URI("hdfs://hadoop001:9000"),conf,"hadoop");
}

/**
* 上传文件
* @throws Exception
*/
@Test
public void testUpload() throws Exception {
if (fs.exists(new Path("/apollo_update.log"))) {
fs.delete(new Path("/apollo_update.log"), true);
}
fs.copyFromLocalFile(new Path("c:/apollo_update.log"), new Path("/apollo_update.log"));
fs.close();
}
}
在文件空间系统之间拷贝文件,这里可以看到如果目标文件是文件夹的话,会通过一个递归操作。
        in = srcFS.open(src);
        out = dstFS.create(dst, overwrite);

这里从localFilesystem打开一个文件,通过类型为DistributedFileSystem的变量dfsFilesystem通过create方法获取一个FSDataOutputStream实例,通过该实例向dataNode写数据,而且里面包含了一个
成员变量dfs的类型为DFSClient,dfs通过RPC与nameNode进行通讯。
public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
FileSystem dstFS, Path dst,
boolean deleteSource,
boolean overwrite,
Configuration conf) throws IOException {
Path src = srcStatus.getPath();
dst = checkDest(src.getName(), dstFS, dst, overwrite);
if (srcStatus.isDirectory()) {
checkDependencies(srcFS, src, dstFS, dst);
if (!dstFS.mkdirs(dst)) {
return false;
}
FileStatus contents[] = srcFS.listStatus(src);
for (int i = 0; i < contents.length; i++) {
copy(srcFS, contents[i], dstFS,
new Path(dst, contents[i].getPath().getName()),
deleteSource, overwrite, conf);
}
} else {
InputStream in=null;
OutputStream out = null;
try {
in = srcFS.open(src);
out = dstFS.create(dst, overwrite);
IOUtils.copyBytes(in, out, conf, true);
} catch (IOException e) {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
throw e;
}
}
if (deleteSource) {
return srcFS.delete(src, true);
} else {
return true;
}

}
DFSOutputStream实例被构造的代码如下。

public DFSOutputStream create(String src,
FsPermission permission,
EnumSet<CreateFlag> flag,
boolean createParent,
short replication,
long blockSize,
Progressable progress,
int buffersize,
ChecksumOpt checksumOpt,
InetSocketAddress[] favoredNodes) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getFileDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
String[] favoredNodeStrs = null;
if (favoredNodes != null) {
favoredNodeStrs = new String[favoredNodes.length];
for (int i = 0; i < favoredNodes.length; i++) {
favoredNodeStrs[i] =
favoredNodes[i].getHostName() + ":"
+ favoredNodes[i].getPort();
}
}
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt),
favoredNodeStrs);
beginFileLease(result.getFileId(), result);
return result;
}
dfsClient.namenode.create ()这里是通过rpc调用与nameNode通讯,申请上传文件。

DFSOutputStream含有一个成员变量streamer类型为DataStreamer,主要是负责启动一个线程将文件通过网络传输至hdfs,获取实例后,会调用start的方法,该方法是启动一个pipeline。

streamer里面同时含有一个成员变量response类型为ResponseProcessor,改变量会启动一个线程负责处理dataNode返回的ack包。

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize,
DataChecksum checksum, String[] favoredNodes) throws IOException {
HdfsFileStatus stat = null;

// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS);
break;
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException(
AccessControlException.class,
DSQuotaExceededException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
NSQuotaExceededException.class,
RetryStartFileException.class,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class,
UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
}
} else {
throw e;
}
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
out.start();
return out;
}然后进入IOUtils.copyBytes(in, out, conf, true);可以看到一个参数io.file.buffer,是读取文件的缓冲区大小,默认是4096byte,感觉这个太细了,如果设置到大小和一个package大小会不会好点?
public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
throws IOException {
copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close);
}

public static void copyBytes(InputStream in, OutputStream out, int buffSize)
throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
byte buf[] = new byte[buffSize];
int bytesRead = in.read(buf);
while (bytesRead >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
}
}
DFSOutputStream也拥有自己的缓存区,如果该缓冲区被写满,就用调用writeChecksumChunks这个方法。

private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length) {
// local buffer is empty and user buffer size >= local buffer size, so
// simply checksum the user buffer and send it directly to the underlying
// stream
final int length = buf.length;
writeChecksumChunks(b, off, length);
return length;
}

// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
// local buffer is full
flushBuffer();
}
return bytesToCopy;
}
里面会在调用一个writeChunk的方法
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
sum.calculateChunkedSums(b, off, len, checksum, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
}
}由于循环太多了,为了编码收到while循环的影响,建议在out.close();打断点,里面同样会调用writeChunk这个方法。
里面又一个比较重要的函数waitAndQueueCurrentPacket。
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();

if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
getChecksumSize() + " but found to be " + cklen);
}

if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + bytesCurBlock);
}
}

currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;

// If packet is full, enqueue it for transmission
//
if (currentPacket.numChunks == currentPacket.maxChunks ||
bytesCurBlock == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.seqno +
", src=" + src +
", bytesCurBlock=" + bytesCurBlock +
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
waitAndQueueCurrentPacket();

// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumBufSize();
}

if (!appendChunk) {
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
}
}
}

这里是把一见写满的packet放入队列,准备发送。并唤醒等待的应该是DFSOutputStream用过DataStreamer创建的传输线程。

private void queueCurrentPacket() {
synchronized (dataQueue) {
if (currentPacket == null) return;
dataQueue.addLast(currentPacket);
lastQueuedSeqno = currentPacket.seqno;
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
}
currentPacket = null;
dataQueue.notifyAll();
}
}DataStreamer找出这个类的run方法,尝试打个断点。尝试验证一下,还真进去了。

nextBlockOutputStream进去这个函数看看,发现里面调用了locateFollowingBlock
public void run() {
long lastPacket = Time.now();
TraceScope traceScope = null;
if (traceSpan != null) {
traceScope = Trace.continueSpan(traceSpan);
}
while (!streamerClosed && dfsClient.clientRunning) {

// if the Responder encountered an error, shutdown Responder
if (hasError && response != null) {
try {
response.close();
response.join();
response = null;
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}

Packet one;
try {
// process datanode IO errors if any
boolean doSleep = false;
if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
doSleep = processDatanodeError();
}

synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.now();
while ((!streamerClosed && !hasError && dfsClient.clientRunning
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
doSleep = false;
now = Time.now();
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
} else {
one = dataQueue.getFirst(); // regular data packet
}
}
assert one != null;

// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Allocating new block");
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if(DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Append to block " + block);
}
setupPipelineForAppendOrRecovery();
initDataStreaming();
}

long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
lastByteOffsetInBlock +
" Aborting file " + src);
}

if (one.lastPacketInBlock) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}

// send the packet
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
}
}

if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DataStreamer block " + block +
" sending packet " + one);
}

// write out data to remote datanode
try {
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
tryMarkPrimaryDatanodeFailed();
throw e;
}
lastPacket = Time.now();

// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}

if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}

// Is this block full?
if (one.lastPacketInBlock) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}

endBlock();
}
if (progress != null) { progress.progress(); }

// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (restartingNodeIndex == -1) {
DFSClient.LOG.warn("DataStreamer Exception", e);
}
if (e instanceof IOException) {
setLastException((IOException)e);
} else {
setLastException(new IOException("DataStreamer Exception: ",e));
}
hasError = true;
if (errorIndex == -1 && restartingNodeIndex == -1) {
// Not a datanode issue
streamerClosed = true;
}
}
}
if (traceScope != null) {
traceScope.close();
}
closeInternal();
}终于找到了这么一行代码!!!dfsClient.namenode.addBlock(src, dfsClient.clientName,block, excludedNodes, fileId, favoredNodes);

这里是与nameNode进行RPC调用,获取dataNode的信息。既然获取到信息我们就可以通过网络把数据发送出去了。 private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes) throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = 400;
while (true) {
long localstart = Time.now();
while (true) {
try {
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
block, excludedNodes, fileId, favoredNodes);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
AccessControlException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
UnresolvedPathException.class);
if (ue != e) {
throw ue; // no need to retry these exceptions
}

if (NotReplicatedYetException.class.getName().
equals(e.getClassName())) {
if (retries == 0) {
throw e;
} else {
--retries;
DFSClient.LOG.info("Exception while adding a block", e);
if (Time.now() - localstart > 5000) {
DFSClient.LOG.info("Waiting for replication for "
+ (Time.now() - localstart) / 1000
+ " seconds");
}
try {
DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
+ " retries left " + retries);
Thread.sleep(sleeptime);
sleeptime *= 2;
} catch (InterruptedException ie) {
DFSClient.LOG.warn("Caught exception ", ie);
}
}
} else {
throw e;
}

}
}
}
}

ExtendedBlock getBlock() {
return block;
}

DatanodeInfo[] getNodes() {
return nodes;
}

Token<BlockTokenIdentifier> getBlockToken() {
return accessToken;
}

private void setLastException(IOException e) {
lastException.compareAndSet(null, e);
}
}再往下去,最后会发现构造了一个SocketOutputstream,然后通过nio将数据传输至datanode.

DataStreamer里面有一个成员blockStream类型为DataOutputStream,这里底层封装了一个out的成员变量,这里的类型为SocketOutputStream。

然后调用了一个doIO的方法,数据就会通过socket nio传输至HDFS。 void waitForIO(int ops) throws IOException {

if (selector.select(channel, ops, timeout) == 0) {
throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
ops));
}
}

最后总结1、向HDFS上传文件时,主要是通过DistributedFileSystem里的DFSOutputStream实现。2、DFSOutputStream中比较重要的成员变量如下: private final LinkedList<Packet> dataQueue = new LinkedList<Packet>(); // 准备写入的Packet队列
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); // dataNode返回的答应队列
private DataStreamer streamer; // 负责开启线程,通过nio的方式将Packet传输至hdfs3、再看看DataStreamer中有什么重要的成员变量: private DataOutputStream blockStream; // 里面实现了SocketOutputStream接口,负责发送Packet
private DataInputStream blockReplyStream; // 接收dataNode返回的答应包
private ResponseProcessor response = null; // 开启一个线程负责将dataNode返回的应答包反序列化,然后进行校验。
4、当第一次与nameNode通讯时,不会立刻获取到dataNode列表,而是通知nameNode准备上传文件。客户端将文件数据写入DFSOutputStream中的缓冲区,缓冲区写满后,唤醒DataSreamer线程,该线程再次与nameNode通讯,获取dataNode列表,再将文件数据发送去dataNode,与此同时dataNode也会返回应答包。然后会唤醒ResponseProcessor线程,分析应答包。从这里看出,读数据,上传数据 和 分析数据都有一个对应的线程。5、文件数据的传输单位为Packet,一个Packet默认大小为64K,一个Packet中含有多个chunk,chunk为校验数据的单位,默认大小512byte。6、按照hdfs的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1,2, 3,则会进行如下的过程:
首先将package 1写入DataNode 1
然后由DataNode 1负责将package 1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1
然后DataNode 2负责将package 1写入DataNode 3, 同时客户端可以讲package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2
就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕.

【来自@若泽大数据】
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: