您的位置:首页 > Web前端 > Node.js

HDFS1.0源代码解析—DataNode数据接收线程DataXceiverServer与DataXceiver解析

2012-07-18 21:45 567 查看
在介绍DataXceiverServer线程之前首先介绍一下,DN启动后一直执行的线程(也就是服务线程)哪些。首选DN本身就是一个线程类,该线程必然会一直执行。在DN的run方法中

1432     // start dataXceiveServer
1433     dataXceiverServer.start();
1434     ipcServer.start();
可以看出会启动一个DataXceiverServer类型的线程,这个线程的主要作用是什么呢,这就是本篇博客要讲述的重点。另外我们还可以看到ipcServer的线程,这个就是前边博客中介绍的RPC机制中的Server线程,负责处理RPC请求,这些请求来在DN之间,貌似在进行recovery的时候使用,具体没有搞清楚。

下边开始DataXceiverServer线程类的主要功能,

43 class DataXceiverServer implements Runnable, FSConstants {
46   ServerSocket ss;
47   DataNode datanode;
看以看出DataXceiverServer也是一个线程类,其中两个比较重要的成员变量是一个ServerSocket对象和DataNode对象本身,其中ServerSocket就是DN负责接收socket连接的对象。我们来看一下这个类最核心的部分,也就是run方法

128   public void run() {
129     while (datanode.shouldRun) {
130       try {
131         Socket s = ss.accept();
132         s.setTcpNoDelay(true);
133         new Daemon(datanode.threadGroup,
134             new DataXceiver(s, datanode, this)).start();
从代码中可以看出DataXceiverServer没接收一个socket连接就会启动一个DataXceiver的线程来处理这一个socket的连接。

这样我们就清晰了DataXceiverServer只是充当了一个接收任务和分配任务的决策,具体实现是在DataXceiver中负责完成的,这又一次体现了模块化设计的思想。从注释中开一看出DataXceiverServer使用一个变量maxXceiverCount控制socket连接诶的数目。

下面开始介绍真正工作的DataXceiver类的主要功能,当然我们已经知道DataXceiver是一个线程类,当然我们最先要关注的还是它的run方法

78   public void run() {
79     DataInputStream in=null;
80     try {
81       in = new DataInputStream(
82           new BufferedInputStream(NetUtils.getInputStream(s),
83                                   SMALL_BUFFER_SIZE));
84       short version = in.readShort();
85       if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
86         throw new IOException( "Version Mismatch" );
87       }
88       boolean local = s.getInetAddress().equals(s.getLocalAddress());
89       byte op = in.readByte();

98       switch ( op ) {
99       case DataTransferProtocol.OP_READ_BLOCK:
100         readBlock( in );

107       case DataTransferProtocol.OP_WRITE_BLOCK:
108         writeBlock( in );

115       case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
116         replaceBlock(in);

119       case DataTransferProtocol.OP_COPY_BLOCK:
120             // for balancing purpose; send to a proxy source
121         copyBlock(in);

124       case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
125         getBlockChecksum(in);
可以看出run方法的主要功能是读取请求的类型,根据类型指相应的操作,从代码中可以看出有五种类型的需求。按个来进行分析,首选来看readBlock,从函数名可以看出这是处理客户端或者其他DN的读取请求的函数

150     long blockId = in.readLong();
151     Block block = new Block( blockId, 0 , in.readLong());
152
153     long startOffset = in.readLong();
154     long length = in.readLong();
155     String clientName = Text.readString(in);

179     BlockSender blockSender = null;

188       try {
189         blockSender = new BlockSender(block, startOffset, length,
190             true, true, false, datanode, clientTraceFmt);

196       out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
197       long read = blockSender.sendBlock(out, baseStream, null); // send data
首先获取要读取的blockId,因为可能一次只读取block中某一部分的数据,所以需要读取内容的便宜和长度。真正读取过程是由BlockSender完成的,该类下一篇博客中会与BlockReceiver类一起介绍

再看writeBlock,这是处理客户端或者其他DN写入数据请求的函数

266     //获取blockid和generationstamp
267     Block block = new Block(in.readLong(),
268         dataXceiverServer.estimateBlockSize, in.readLong());

272     int pipelineSize = in.readInt(); // num of datanodes in entire pipeline

274     boolean isRecovery = in.readBoolean(); // is this part of recovery?
275     String client = Text.readString(in); // working on behalf of this client
276     boolean hasSrcDataNode = in.readBoolean(); // is src node info present

281     int numTargets = in.readInt();

315     DataOutputStream mirrorOut = null;  // stream to next target
316     DataInputStream mirrorIn = null;    // reply from next target
317     Socket mirrorSock = null;           // socket to next target
318     BlockReceiver blockReceiver = null; // responsible for data handling

324       blockReceiver = new BlockReceiver(block, in,
325           s.getRemoteSocketAddress().toString(),
326           s.getLocalSocketAddress().toString(),
327           isRecovery, client, srcDataNode, datanode);

429       blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
430                                  mirrorAddr, null, targets.length);
这是列出了一些核心的代码,这里需要说明一下HDFS数据写入的方式,因为HDFS采用了冗余备份的机制,所以一个block一般要写入到3个DN上,HDFS采用的使用流水线的写入机制,例如要将数据写入到A、B、C三个节点上,首先就建立起A->B->C的流水线,数据线写入到节点A,A在将数据写入到节点B,B再将数据写入到节点C,最后C将写入是否状态返回B,B返回A,A返回客户端,写入完成。

所以需要发送流水线长度(也就是要写入的节点的数目),mirrorIn、mirrorOut是流水线中节点与其他上游和下游节点交互的流。其中代码的大部分是用来建立数据的流水线,因为在写入数据之前需要保证这条流水线是打通的,所以每节点都将头信息发送给下游节点,最后一个节点除外。如成功一次向上游发送确认信息,建立起流水线,再进行数据传输。最终数据传输使用的是BlockReceiver类的receiveBlock方法。

getBlockChecksum方法是一个简单方法,主要是返回一个block的检验信息。

copyBlock与writeBlock很相似,主要的区别是copyBlock是对NN的copy命令做出的反应而writeBlock响应的客户端的请求,也就是只是将数据从一个节点拷贝到另外一个节点,不会存在上边提到的流水线的东西。

ok,本部分介绍完毕,欢迎大家留言讨论!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: