您的位置:首页 > 大数据 > Hadoop

HDFS源码分析DataXceiver之整体流程

2016-06-03 14:32 525 查看
《HDFS源码分析之DataXceiverServer》一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer。它被用于接收来自客户端或其他数据节点的数据读写请求,为每个数据读写请求创建一个单独的线程去处理。而处理每次读写请求时所创建的线程,就是本文要讲的DataXceiver。本文,我们来看下DataXceiver的具体实现,着重讲解下它得到数据读写请求后的整体处理流程。

首先,我们先看下DataXceiver的成员变量,具体如下:

[java] view plain copy

// 封装了Socket、输入流、输出流的Peer,是DataXceiver线程工作的主要依托者

private Peer peer;

// 通讯两端地址:远端地址remoteAddress、本地端地址localAddress,均是从peer(即socket)中获得的

private final String remoteAddress; // address of remote side

private final String localAddress; // local address of this daemon

// DataNode节点进程实例datanode

private final DataNode datanode;

// DataNode节点配置信息dnConf

private final DNConf dnConf;

// DataXceiverServer线程实例dataXceiverServer

private final DataXceiverServer dataXceiverServer;

// 连接DataNode是否使用主机名,取参数dfs.datanode.use.datanode.hostname,参数未配置的话默认为false,不使用

private final boolean connectToDnViaHostname;

// 接收到一个操作op的开始时间

private long opStartTime; //the start time of receiving an Op

// InputStream输入流socketIn

private final InputStream socketIn;

// OutputStream输出流socketOut

private OutputStream socketOut;

// 数据块接收器BlockReceiver对象blockReceiver

private BlockReceiver blockReceiver = null;

/**

* Client Name used in previous operation. Not available on first request

* on the socket.

* previousOpClientName为之前操作的客户端名字,它对于socket上的第一个请求不可用

*/

private String previousOpClientName;

既然DataXceiver是为处理数据读写请求而创建的线程,那么Socket、输入流、输出流就是必不可少的成员。而首当其冲的Peer,便封装了Socket、输入流、输出流的Peer,是DataXceiver线程工作的主要依托者,而接下来的输入流socketIn、输出流socketOut都是来自peer的socket。另外,DataXceiver还提供了通讯两端地址:远端地址remoteAddress、本地端地址localAddress,均是从peer(即socket)中获得的。

既然是由DataNode上的DataXceiverServer线程创建的,那么自然少不了datanode、dataXceiverServer、dnConf等变量,并且,它是专门用来处理数据读写请求的,自然也需要像数据块接收器BlockReceiver对象blockReceiver这种成员变量。dnConf是DNConf类型的数据节点DataNode上的配置信息。

剩下的几个,便是在处理具体的数据读写请求时用到的connectToDnViaHostname、opStartTime、previousOpClientName等变量。其中,connectToDnViaHostname标识连接DataNode是否使用主机名,取参数dfs.datanode.use.datanode.hostname,参数未配置的话默认为false,不使用,opStartTime为接收到一个操作op的开始时间,最后的previousOpClientName为之前操作的客户端名字,它对于socket上的第一个请求不可用。

下面我们再看下它的构造方法,只有一个private的,如下:

[java] view plain copy

/**

* 私有构造函数,需要Peer、DataNode、DataXceiverServer三个参数

*/

private DataXceiver(Peer peer, DataNode datanode,

DataXceiverServer dataXceiverServer) throws IOException {

/ peer、datanode、dataXceiverServer等成员变量直接赋值

this.peer = peer;

this.dnConf = datanode.getDnConf();

// 输入流socketIn、输出流socketOut来自peer的socket

this.socketIn = peer.getInputStream();

this.socketOut = peer.getOutputStream();

this.datanode = datanode;

this.dataXceiverServer = dataXceiverServer;

// connectToDnViaHostname取自数据节点配置信息dnConf

this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;

// 远端remoteAddress和本地localAddress地址取自Peer

remoteAddress = peer.getRemoteAddressString();

localAddress = peer.getLocalAddressString();

if (LOG.isDebugEnabled()) {

LOG.debug("Number of active connections is: "

+ datanode.getXceiverCount());

}

}

但是,它提供了一个类的静态create()方法,用于DataXceiver对象的构造,代码如下:

[java] view plain copy

/**

* 提供了一个静态方法create(),调用私有构造函数构造DataXceiver对象

*/

public static DataXceiver create(Peer peer, DataNode dn,

DataXceiverServer dataXceiverServer) throws IOException {

return new DataXceiver(peer, dn, dataXceiverServer);

}

上述构造方法及静态create()方法都很简单,不再赘述。

接下来,我们再着重分析下,DataXceiver线程在启动后,是如何处理来自客户端或者其他数据节点发送的数据读写请求的。既然是线程,那么就不得不看看它的run()方法,代码如下:

[java] view plain copy

/**

* Read/write data from/to the DataXceiverServer.

* 从DataXceiverServer中读取或者往DataXceiverServer中写入数据

*/

@Override

public void run() {

int opsProcessed = 0;

Op op = null;

try {

// 在dataXceiverServer中增加peer与该DataXceiver实例所在线程和DataXceiver实例的映射关系

dataXceiverServer.addPeer(peer, Thread.currentThread(), this);

// peer中设置socket写入超时时间

peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);

InputStream input = socketIn;

try {

// IOStreamPair为一个输入输出流对,既包含输入流,也包含输出流

IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,

socketIn, datanode.getXferAddress().getPort(),

datanode.getDatanodeId());

// 包装saslStreams的输入流in为BufferedInputStream,得到输入流input,其缓冲区大小取参数io.file.buffer.size的一半,

// 参数未配置的话默认为512,且最大也不能超过512

input = new BufferedInputStream(saslStreams.in,

HdfsConstants.SMALL_BUFFER_SIZE);

// 从saslStreams中获取输出流socketOut

socketOut = saslStreams.out;

} catch (InvalidMagicNumberException imne) {

LOG.info("Failed to read expected encryption handshake from client " +

"at " + peer.getRemoteAddressString() + ". Perhaps the client " +

"is running an older version of Hadoop which does not support " +

"encryption");

return;

}

// 调用父类initialize()方法,完成初始化,实际上就是设置父类的输入流in

super.initialize(new DataInputStream(input));

// We process requests in a loop, and stay around for a short timeout.

// This optimistic behaviour allows the other end to reuse connections.

// Setting keepalive timeout to 0 disable this behavior.

// 在一个do...while循环内完成请求的处理。

do {

// 更新当前线程名称,通过线程名标识进度的一种手段,不错

updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));

try {

// 由于第一次是创建一个新的socket使用,连接的时间可能会很长,所以连接超时时间设置的比较大,

// 而后续使用的话,是复用socket,连接的超时时间限制就没必要设置那么大了

if (opsProcessed != 0) {

// 如果不是第一次出来请求,确保dnConf的socketKeepaliveTimeout大于0,

// 将其设置为设置peer(即socket)的读超时时间,

// 取参数dfs.datanode.socket.reuse.keepalive,参数为配置的话,默认为4s

assert dnConf.socketKeepaliveTimeout > 0;

peer.setReadTimeout(dnConf.socketKeepaliveTimeout);

} else {

// 最开始第一次处理请求时,设置peer(即socket)读超时时间为dnConf的socketTimeout

// 即取参数dfs.client.socket-timeout,参数未配置的话默认为60s

peer.setReadTimeout(dnConf.socketTimeout);

}

// 通过readOp()方法读取操作符op

op = readOp();

} catch (InterruptedIOException ignored) {

// Time out while we wait for client rpc

// 如果是InterruptedIOException异常,跳出循环

break;

} catch (IOException err) {

// Since we optimistically expect the next op, it's quite normal to get EOF here.

if (opsProcessed > 0 &&

(err instanceof EOFException || err instanceof ClosedChannelException)) {

if (LOG.isDebugEnabled()) {

LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");

}

} else {

throw err;

}

break;

}

// restore normal timeout

// 重新存储正常的超时时间,即dnConf的socketTimeout

if (opsProcessed != 0) {

peer.setReadTimeout(dnConf.socketTimeout);

}

// 设置操作的起始时间opStartTime

opStartTime = now();

// 通过processOp()方法根据操作符op调用相应的方法处理操作符op

processOp(op);

// 累加操作数

++opsProcessed;

} while ((peer != null) &&

(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));

// 循环的条件便是:peer未关闭且复用超时时间socketKeepaliveTimeout大于0

} catch (Throwable t) {

String s = datanode.getDisplayName() + ":DataXceiver error processing "

+ ((op == null) ? "unknown" : op.name()) + " operation "

+ " src: " + remoteAddress + " dst: " + localAddress;

if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) {

// For WRITE_BLOCK, it is okay if the replica already exists since

// client and replication may write the same block to the same datanode

// at the same time.

if (LOG.isTraceEnabled()) {

LOG.trace(s, t);

} else {

LOG.info(s + "; " + t);

}

} else {

LOG.error(s, t);

}

} finally {

if (LOG.isDebugEnabled()) {

LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "

+ datanode.getXceiverCount());

}

// 更新当前线程名称

updateCurrentThreadName("Cleaning up");

// 关闭peer(socket)、输入流等资源

if (peer != null) {

dataXceiverServer.closePeer(peer);

IOUtils.closeStream(in);

}

}

}

run()方法的处理流程逻辑十分清晰,概括如下:

1、在dataXceiverServer中增加peer与该DataXceiver实例所在线程和DataXceiver实例的映射关系;

2、peer中设置socket写入超时时间,取参数dfs.datanode.socket.write.timeout,参数未配置的话默认为8分钟;

3、获取IOStreamPair类型的saslStreams,其为一个输入输出流对,既包含输入流,也包含输出流;

4、包装saslStreams的输入流in为BufferedInputStream,得到输入流input,其缓冲区大小取参数io.file.buffer.size的一半,参数未配置的话默认为512,且最大也不能超过512;

5、从saslStreams中获取输出流socketOut;

6、调用父类initialize()方法,完成初始化,实际上就是设置父类的输入流in;

7、在一个do...while循环内完成请求的处理,循环的条件便是--peer未关闭且复用超时时间socketKeepaliveTimeout大于0:

7.1、更新当前线程名称,通过线程名标识进度的一种手段,不错,线程名此时为Waiting for operation #100(100为操作处理次数累加器的下一个值);

7.2、处理读超时时间设置:由于第一次是创建一个新的socket使用,连接的时间可能会很长,所以连接超时时间设置的比较大,而后续使用的话,是复用socket,连接的超时时间限制就没必要设置那么大了。所以,最开始第一次处理请求时,设置peer(即socket)读超时时间为dnConf的socketTimeout,即取参数dfs.client.socket-timeout,参数未配置的话默认为60s;如果不是第一次出来请求,确保dnConf的socketKeepaliveTimeout大于0,将其设置为设置peer(即socket)的读超时时间,取参数dfs.datanode.socket.reuse.keepalive,参数为配置的话,默认为4s;

7.3、通过readOp()方法读取操作符op;

7.4、重新存储正常的超时时间,即dnConf的socketTimeout;

7.5、设置操作的起始时间opStartTime,为当前时间;

7.6、通过processOp()方法根据操作符op调用相应的方法处理操作符op;

7.7、累加操作数opsProcessed;

8、更新当前线程名称:Cleaning up;

9、关闭peer(socket)、输入流等资源。

实际上,对于读写请求的处理的一个主线,便是在socket未关闭的情况下,不停的读取操作符,然后调用相应的方法处理,也就是do...while循环内的op = readOp()-----processOp(op)这一处理主线。

下面,我们来看下读取操作符的readOp()方法,它位于DataXceiver的父类Receiver中。代码如下:

[java] view plain copy

/** Read an Op. It also checks protocol version. */

protected final Op readOp() throws IOException {

// 首先从输入流in中读入版本号version,short类型,占2个字节

final short version = in.readShort();

// 校验版本号version是否与DataTransferProtocol中的DATA_TRANSFER_VERSION相等,该版本中为28

if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {

throw new IOException( "Version Mismatch (Expected: " +

DataTransferProtocol.DATA_TRANSFER_VERSION +

", Received: " + version + " )");

}

// 调用Op的read()方法,从输入流in中获取操作符op

return Op.read(in);

}

代码中有详细注释,不再解释。继续追踪Op的read()方法,代码如下:

[java] view plain copy

private static final int FIRST_CODE = values()[0].code;

/** Return the object represented by the code. */

private static Op valueOf(byte code) {

final int i = (code & 0xff) - FIRST_CODE;

return i < 0 || i >= values().length? null: values()[i];

}

/** Read from in */

public static Op read(DataInput in) throws IOException {

return valueOf(in.readByte());

}

很简单,通过read()方法从输入流读取byte,并通过valueOf()方法,首先将byte转化为int,然后减去Op操作符枚举类型的第一个值:WRITE_BLOCK,即80,得到i。如果i小于0或者大于枚举中操作符的个数,说明输入流中传入的操作符不在枚举范围内,否则利用i作为索引取出相应的操作符。枚举类型如下:

[java] view plain copy

WRITE_BLOCK((byte)80),

READ_BLOCK((byte)81),

READ_METADATA((byte)82),

REPLACE_BLOCK((byte)83),

COPY_BLOCK((byte)84),

BLOCK_CHECKSUM((byte)85),

TRANSFER_BLOCK((byte)86),

REQUEST_SHORT_CIRCUIT_FDS((byte)87),

RELEASE_SHORT_CIRCUIT_FDS((byte)88),

REQUEST_SHORT_CIRCUIT_SHM((byte)89);

比较简单,写数据块为80,读数据块为81等,不再一一介绍。操作符为int类型,也就意味着它占4个字节。

接下来,我们再看下处理操作符的processOp()方法,同样在DataXceiver的父类Receiver中。代码如下:

[java] view plain copy

/** Process op by the corresponding method. */

protected final void processOp(Op op) throws IOException {

// 通过调用相应的方法处理操作符

switch(op) {

case READ_BLOCK:// 读数据块调用opReadBlock()方法

opReadBlock();

break;

case WRITE_BLOCK:// 写数据块调用opWriteBlock()方法

opWriteBlock(in);

break;

case REPLACE_BLOCK:// 替换数据块调用opReplaceBlock()方法

opReplaceBlock(in);

break;

case COPY_BLOCK:// 复制数据块调用REPLACE()方法

opCopyBlock(in);

break;

case BLOCK_CHECKSUM:// 数据块检验调用opBlockChecksum()方法

opBlockChecksum(in);

break;

case TRANSFER_BLOCK:// 移动数据块调用opTransferBlock()方法

opTransferBlock(in);

break;

case REQUEST_SHORT_CIRCUIT_FDS:

opRequestShortCircuitFds(in);

break;

case RELEASE_SHORT_CIRCUIT_FDS:

opReleaseShortCircuitFds(in);

break;

case REQUEST_SHORT_CIRCUIT_SHM:

opRequestShortCircuitShm(in);

break;

default:

throw new IOException("Unknown op " + op + " in data stream");

}

}

一目了然,根据操作符的不同,调用不同的方法去处理。比如读数据块调用opReadBlock()方法,写数据块调用opWriteBlock()方法,替换数据块调用opReplaceBlock()方法等等,读者可自行阅读。
至此,HDFS源码分析DataXceiver之整体流程全部叙述完毕。后续文章会陆续推出对于写数据块、读数数据块、替换数据块、移动数据块等的详细操作,以及DataXceiver线程中用到的数据块发送器BlockSender、数据块接收器BlockReceiver的详细分析,敬请期待!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: