DataNode内部的调度线程
2012-08-07 14:30
731 查看
首先DataNode自身就是一个Runnable的实现,也就是说DataNode是以一个单独的线程在运行着的。
DataXceiver根据前面建立的Socket,建立DataInputStream,读取请求的操作,请求操作可以分为读取(OP_READ_BLOCK),写入(OP_WRITE_BLOCK),替换(OP_REPLACE_BLOCK),拷贝block(OP_COPY_BLOCK),获取block的checksum(OP_BLOCK_CHECKSUM)。
OP_READ_BLOCK:调用DataXceiver.readBlock,在方法内新建BlockSender,主要是在BlockSender中将数据发送出去,数据以一个个packet的形式发送
OP_WRITE_BLOCK:调用的是DataXceiver.writeBlock,在方法内新建BlockReceiver,BlockReceiver内需要建立多个数据流:
DataInputStream in:连接DFSClient或者前面一个DataNode的输入流.
DataOutputStream mirrorOut: 连接后一个DataNode的输出流(最后一个DataNode为null)
DataInputStream mirrorIn:连接后一个DataNode的输入流(最后一个DataNode为null)
DataOutputStream replyOut:连接DFSClient或者前面一个DataNode的输出流
他们的关系如下:
DataXceiver.writeBlock最终会调用BlockReceiver.receiveBlock方法,该方法从in中以packet的形式读取到数据流,然后通过mirrorOut写入到下一个DataNode中,如果是DFSClient发起的操作的话,BlockReceiver会使用一个单词的线程PacketResponder来处理mirrorIn的响应。
BlockReceiver和PacketResponder的交互是通过PacketResponder的属性ackQueue(LinkedList<Packet>),当BlockReceiver的mirrorOut写完一个packet,在ackQueue里面加入一条记录,PacketResponder主循环中是不断的从ackQueue中获得记录(没有就阻塞,生产者-消费者模型),当获得记录后,还需要阻塞的从mirrorIn中获得下游DataNode的Ack回应, 对于DataNode1来说mirrorIn就等于DataNode2的replyOut,当收到Ack回应后和ackQueue中的Packet序列号比对,验证是否是刚才发送的那个packet;当验证通过后,DataNode1继续向上游的DataNode发送ack的回应。如果这个是此block的最后一个packet,DataNode会调用FSDataset.finalizeBlock方法,此方法将在blocksBeingWritten目录下的block和block meta文件rename到current目录下。在此之后,还需要通知namenode block已经写入完成。BlockReceiver和PacketResponder的交互图如下:
OP_REPLACE_BLOCK和OP_WRITE_BLOCK类似,OP_COPY_BLOCK和OP_READ_BLOCK类似。
DataBlockScanner最大扫描速度是8 MB/s,通过BlockTransferThrottler来限制流量,最小扫描速度是1 MB/s,默认扫描周期是21天,扫描周期可通过dfs.datanode.scan.period.hours来设置
该类中的属性TreeSet<BlockScanInfo> blockInfoSet用来保存block的扫描时间,离现在时间间隔最长的排在首位,扫描的过程如下:
检查blockInfoSet中的第一个block的最后扫描时间距离现在是否超过扫描周期,如果不超过,休眠一定时间然后开始下次检查
如果扫描周期,那么对该block进行校验,校验使用BlockSender来读取,读取的数据输出到NullOutputStream,我们知道BlockSender在读取数据时,可以检查checksum,以此来判断是否校验成功。如果校验失败,进行第二次校验,如果两次都失败,说明该block有错误,通知namenode
DataXceiverServer,DataXceiver
DataNode在Block的数据流读写方面都交给了DataXceiverServer,DataXceiverServer也是运行在一个单独的线程当中,在主循环当中阻塞监听dfs.datanode.address配置的端口号,当有连接过来时,新建一个DataXceiver实例,并且在单独的线程当中运行DataXceiver。DataXceiver根据前面建立的Socket,建立DataInputStream,读取请求的操作,请求操作可以分为读取(OP_READ_BLOCK),写入(OP_WRITE_BLOCK),替换(OP_REPLACE_BLOCK),拷贝block(OP_COPY_BLOCK),获取block的checksum(OP_BLOCK_CHECKSUM)。
OP_READ_BLOCK:调用DataXceiver.readBlock,在方法内新建BlockSender,主要是在BlockSender中将数据发送出去,数据以一个个packet的形式发送
OP_WRITE_BLOCK:调用的是DataXceiver.writeBlock,在方法内新建BlockReceiver,BlockReceiver内需要建立多个数据流:
DataInputStream in:连接DFSClient或者前面一个DataNode的输入流.
DataOutputStream mirrorOut: 连接后一个DataNode的输出流(最后一个DataNode为null)
DataInputStream mirrorIn:连接后一个DataNode的输入流(最后一个DataNode为null)
DataOutputStream replyOut:连接DFSClient或者前面一个DataNode的输出流
他们的关系如下:
DataXceiver.writeBlock最终会调用BlockReceiver.receiveBlock方法,该方法从in中以packet的形式读取到数据流,然后通过mirrorOut写入到下一个DataNode中,如果是DFSClient发起的操作的话,BlockReceiver会使用一个单词的线程PacketResponder来处理mirrorIn的响应。
BlockReceiver和PacketResponder的交互是通过PacketResponder的属性ackQueue(LinkedList<Packet>),当BlockReceiver的mirrorOut写完一个packet,在ackQueue里面加入一条记录,PacketResponder主循环中是不断的从ackQueue中获得记录(没有就阻塞,生产者-消费者模型),当获得记录后,还需要阻塞的从mirrorIn中获得下游DataNode的Ack回应, 对于DataNode1来说mirrorIn就等于DataNode2的replyOut,当收到Ack回应后和ackQueue中的Packet序列号比对,验证是否是刚才发送的那个packet;当验证通过后,DataNode1继续向上游的DataNode发送ack的回应。如果这个是此block的最后一个packet,DataNode会调用FSDataset.finalizeBlock方法,此方法将在blocksBeingWritten目录下的block和block meta文件rename到current目录下。在此之后,还需要通知namenode block已经写入完成。BlockReceiver和PacketResponder的交互图如下:
OP_REPLACE_BLOCK和OP_WRITE_BLOCK类似,OP_COPY_BLOCK和OP_READ_BLOCK类似。
DataBlockScanner
DataBlockScanner也是在一个单独的线程里面进行执行,作用是周期性的对block进行校验,当DFSClient读取时,也会通知DataBlockScanner校验结果。DataBlockScanner最大扫描速度是8 MB/s,通过BlockTransferThrottler来限制流量,最小扫描速度是1 MB/s,默认扫描周期是21天,扫描周期可通过dfs.datanode.scan.period.hours来设置
该类中的属性TreeSet<BlockScanInfo> blockInfoSet用来保存block的扫描时间,离现在时间间隔最长的排在首位,扫描的过程如下:
检查blockInfoSet中的第一个block的最后扫描时间距离现在是否超过扫描周期,如果不超过,休眠一定时间然后开始下次检查
如果扫描周期,那么对该block进行校验,校验使用BlockSender来读取,读取的数据输出到NullOutputStream,我们知道BlockSender在读取数据时,可以检查checksum,以此来判断是否校验成功。如果校验失败,进行第二次校验,如果两次都失败,说明该block有错误,通知namenode
DataTransfer
DataTransfer是在DataNode处理NameNode返回的各种命令中的一个命令:DNA_TRANSFER,DataNode处理此命令时,对于每一个Block,新建DataTransfer的实例在单独的线程当中运行,在DataTransfer的主循环当中建立和其他DataNode的pipeline,然后利用BlockSender发送block,其他DataNode的处理流程在DataXceiver当中。ipcServer
在DataNode启动的时候,会创建一个ipcServer,这个ipcServer是Hadoop的ipc框架的Server实例,主要是在DataNode和DataNode之间recover block时使用,recover block会在两个地方发生,一个是namenode返回的recover命令,一个是DFSClient写数据时遇到DataNode错误时触发相关文章推荐
- Hadoop源码学习笔记(3) ——初览DataNode及学习线程
- HDFS1.0源代码解析—DataNode数据接收线程DataXceiverServer与DataXceiver解析
- DataNode内部的各种数据结构
- 理解Node.js的事件循环(代码是异步单线程,内部实现用的还是进程和线程,基于池化的线程实现异步)
- datanode启动开启了那些任务线程
- Hadoop源码学习笔记(3) ——初览DataNode及学习线程
- hadoop 起步的时候datanode报错 Problem connecting to server---转
- Hadoop RPC整个使用流程——以DataNode向NameNode注册为例
- sql server 2008 System.Data.SqlClient.SqlException (0x80131904): 查询处理器未能为执行并行查询启动必要的线程资源 处理方法
- HTTP请求中的form data,request payload,query string parameters以及在node服务器中如何接收这些参数
- hadoop HA 集群启动发现现datanode没有启动,namenode clusterID与datanode clusterID不兼容,不匹配。
- cannot register datanode 0.0.0.0:1002,because current license expired time is
- hadoop datanode源码分析
- ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: Incompatible namespaceIDs
- HDFS DataNode 设计实现解析
- Hadoop集群动态添加datanode节点步骤
- org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible namespaceIDs in /
- org.apache.hadoop.hdfs.server.datanode.DataNode: Invalid dfs.datanode.data.dir /chunk : java.io.Fil
- Hadoop中DataNode与NameNode之间的心跳机制
- Hadoop DataNode启动之DataBlockScanner