HDFS1.0源代码解析—DataNode数据接收线程DataXceiverServer与DataXceiver解析
2012-07-18 21:45
567 查看
在介绍DataXceiverServer线程之前首先介绍一下,DN启动后一直执行的线程(也就是服务线程)哪些。首选DN本身就是一个线程类,该线程必然会一直执行。在DN的run方法中
下边开始DataXceiverServer线程类的主要功能,
这样我们就清晰了DataXceiverServer只是充当了一个接收任务和分配任务的决策,具体实现是在DataXceiver中负责完成的,这又一次体现了模块化设计的思想。从注释中开一看出DataXceiverServer使用一个变量maxXceiverCount控制socket连接诶的数目。
下面开始介绍真正工作的DataXceiver类的主要功能,当然我们已经知道DataXceiver是一个线程类,当然我们最先要关注的还是它的run方法
再看writeBlock,这是处理客户端或者其他DN写入数据请求的函数
所以需要发送流水线长度(也就是要写入的节点的数目),mirrorIn、mirrorOut是流水线中节点与其他上游和下游节点交互的流。其中代码的大部分是用来建立数据的流水线,因为在写入数据之前需要保证这条流水线是打通的,所以每节点都将头信息发送给下游节点,最后一个节点除外。如成功一次向上游发送确认信息,建立起流水线,再进行数据传输。最终数据传输使用的是BlockReceiver类的receiveBlock方法。
getBlockChecksum方法是一个简单方法,主要是返回一个block的检验信息。
copyBlock与writeBlock很相似,主要的区别是copyBlock是对NN的copy命令做出的反应而writeBlock响应的客户端的请求,也就是只是将数据从一个节点拷贝到另外一个节点,不会存在上边提到的流水线的东西。
ok,本部分介绍完毕,欢迎大家留言讨论!
1432 // start dataXceiveServer 1433 dataXceiverServer.start(); 1434 ipcServer.start();可以看出会启动一个DataXceiverServer类型的线程,这个线程的主要作用是什么呢,这就是本篇博客要讲述的重点。另外我们还可以看到ipcServer的线程,这个就是前边博客中介绍的RPC机制中的Server线程,负责处理RPC请求,这些请求来在DN之间,貌似在进行recovery的时候使用,具体没有搞清楚。
下边开始DataXceiverServer线程类的主要功能,
43 class DataXceiverServer implements Runnable, FSConstants { 46 ServerSocket ss; 47 DataNode datanode;看以看出DataXceiverServer也是一个线程类,其中两个比较重要的成员变量是一个ServerSocket对象和DataNode对象本身,其中ServerSocket就是DN负责接收socket连接的对象。我们来看一下这个类最核心的部分,也就是run方法
128 public void run() { 129 while (datanode.shouldRun) { 130 try { 131 Socket s = ss.accept(); 132 s.setTcpNoDelay(true); 133 new Daemon(datanode.threadGroup, 134 new DataXceiver(s, datanode, this)).start();从代码中可以看出DataXceiverServer没接收一个socket连接就会启动一个DataXceiver的线程来处理这一个socket的连接。
这样我们就清晰了DataXceiverServer只是充当了一个接收任务和分配任务的决策,具体实现是在DataXceiver中负责完成的,这又一次体现了模块化设计的思想。从注释中开一看出DataXceiverServer使用一个变量maxXceiverCount控制socket连接诶的数目。
下面开始介绍真正工作的DataXceiver类的主要功能,当然我们已经知道DataXceiver是一个线程类,当然我们最先要关注的还是它的run方法
78 public void run() { 79 DataInputStream in=null; 80 try { 81 in = new DataInputStream( 82 new BufferedInputStream(NetUtils.getInputStream(s), 83 SMALL_BUFFER_SIZE)); 84 short version = in.readShort(); 85 if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) { 86 throw new IOException( "Version Mismatch" ); 87 } 88 boolean local = s.getInetAddress().equals(s.getLocalAddress()); 89 byte op = in.readByte(); 98 switch ( op ) { 99 case DataTransferProtocol.OP_READ_BLOCK: 100 readBlock( in ); 107 case DataTransferProtocol.OP_WRITE_BLOCK: 108 writeBlock( in ); 115 case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination 116 replaceBlock(in); 119 case DataTransferProtocol.OP_COPY_BLOCK: 120 // for balancing purpose; send to a proxy source 121 copyBlock(in); 124 case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block 125 getBlockChecksum(in);可以看出run方法的主要功能是读取请求的类型,根据类型指相应的操作,从代码中可以看出有五种类型的需求。按个来进行分析,首选来看readBlock,从函数名可以看出这是处理客户端或者其他DN的读取请求的函数
150 long blockId = in.readLong(); 151 Block block = new Block( blockId, 0 , in.readLong()); 152 153 long startOffset = in.readLong(); 154 long length = in.readLong(); 155 String clientName = Text.readString(in); 179 BlockSender blockSender = null; 188 try { 189 blockSender = new BlockSender(block, startOffset, length, 190 true, true, false, datanode, clientTraceFmt); 196 out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status 197 long read = blockSender.sendBlock(out, baseStream, null); // send data首先获取要读取的blockId,因为可能一次只读取block中某一部分的数据,所以需要读取内容的便宜和长度。真正读取过程是由BlockSender完成的,该类下一篇博客中会与BlockReceiver类一起介绍
再看writeBlock,这是处理客户端或者其他DN写入数据请求的函数
266 //获取blockid和generationstamp 267 Block block = new Block(in.readLong(), 268 dataXceiverServer.estimateBlockSize, in.readLong()); 272 int pipelineSize = in.readInt(); // num of datanodes in entire pipeline 274 boolean isRecovery = in.readBoolean(); // is this part of recovery? 275 String client = Text.readString(in); // working on behalf of this client 276 boolean hasSrcDataNode = in.readBoolean(); // is src node info present 281 int numTargets = in.readInt(); 315 DataOutputStream mirrorOut = null; // stream to next target 316 DataInputStream mirrorIn = null; // reply from next target 317 Socket mirrorSock = null; // socket to next target 318 BlockReceiver blockReceiver = null; // responsible for data handling 324 blockReceiver = new BlockReceiver(block, in, 325 s.getRemoteSocketAddress().toString(), 326 s.getLocalSocketAddress().toString(), 327 isRecovery, client, srcDataNode, datanode); 429 blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, 430 mirrorAddr, null, targets.length);这是列出了一些核心的代码,这里需要说明一下HDFS数据写入的方式,因为HDFS采用了冗余备份的机制,所以一个block一般要写入到3个DN上,HDFS采用的使用流水线的写入机制,例如要将数据写入到A、B、C三个节点上,首先就建立起A->B->C的流水线,数据线写入到节点A,A在将数据写入到节点B,B再将数据写入到节点C,最后C将写入是否状态返回B,B返回A,A返回客户端,写入完成。
所以需要发送流水线长度(也就是要写入的节点的数目),mirrorIn、mirrorOut是流水线中节点与其他上游和下游节点交互的流。其中代码的大部分是用来建立数据的流水线,因为在写入数据之前需要保证这条流水线是打通的,所以每节点都将头信息发送给下游节点,最后一个节点除外。如成功一次向上游发送确认信息,建立起流水线,再进行数据传输。最终数据传输使用的是BlockReceiver类的receiveBlock方法。
getBlockChecksum方法是一个简单方法,主要是返回一个block的检验信息。
copyBlock与writeBlock很相似,主要的区别是copyBlock是对NN的copy命令做出的反应而writeBlock响应的客户端的请求,也就是只是将数据从一个节点拷贝到另外一个节点,不会存在上边提到的流水线的东西。
ok,本部分介绍完毕,欢迎大家留言讨论!
相关文章推荐
- HDFS1.0源代码解析—DataNode端数据存储和管理DataStorage和FSDataset解析
- HDFS1.0源代码解析—DataNode启动(三)
- HDFS1.0源代码解析—DataNode状态切换
- HDFS1.0源代码解析—DataNode启动(二)
- HDFS1.0源代码解析—DataNode启动(一)
- HDFS1.0源代码解析—Hadoop的RPC机制之Server端解析
- 大数据之路-Hadoop-5-HDFS原理解析及NameNode、DataNode工作机制
- HDFS写操作(二) DataNode DataXceiverServer DataXceiver
- hadoop源代码分析(2)-hdfs.server.datanode包-DataXceiverServer类【原创】
- HDFS1.0源代码解析—DataNode类主要数据成员和函数
- HDFS1.0源代码解析—数据传输和接受的类BlockSender和BlockReceiver
- ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: Incompatible namespaceIDs
- HDFS Datanode与Client之间的数据传输
- HDFS DataNode 设计实现解析
- HDFS的create函数解析及如何选择存储Block的DataNode
- 使用线程接收串口接收数据,DLT645 2007 智能电表抄表代码
- ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: All directories in dfs.data.dir are invalid
- HADOOP :WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Problem connecting to server: xxx/xxx
- 后端分布式系列:分布式存储-HDFS DataNode 设计实现解析
- ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible namespaceID