您的位置:首页 > 运维架构

Hadoop RPC源码解析——Server类(一)

2015-02-15 21:01 288 查看
Hadoop RPC主要由三大部分组成:Client、Server和RPC,如下表所示。

内部类
功能
Client
连接服务器、传递函数名和相应的参数、等待结果

Server
主要接受Client的请求、执行相应的函数、返回结果

RPC
外部编程接口,主要是为通信的服务方提供代理

这三部分在Hadoop RPC架构中的位置如下图所示:



Hadoop采用了Master/Slave结构。其中,Master是整个系统的单节点,这是制约系统性能和扩展性的最关键因素之一,而Master通过Server接受并处理所有Slave发送的请求,这就要求Server类将高并发和可扩展性作为设计目标。为此Server采用了具有提高并发处理能力的技术,如线程池、事件驱动和Reactor设计模式等。

Reactor是并发编程中的一种基本事件驱动的设计模式。它具有以下两个特点:通过派发/分离I/O操作事件提高系统的并发性能;提供了粗粒度的并发控制,使用单线程实现,避免复杂的同步处理。典型的Reactor模式的工作原理图



ipc包下的server类实现了典型的Reactor设计模式,它主要分为三个步骤:接收请求、处理请求和返回结果,如下图3-12所示。

1、       接收请求:该阶段的主要任务是接收来自各个客户端的请求,并将它们封装成固定的格式(call)放到一个共享队列(callQueue)中。该阶段内部又分为两个子阶段:建立连接和接收请求,分别由Listener和Reader完成。整个Server只有一个Listener线程,它统一负责监听来自客户端的连接请求。一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理。Listener和Reader线程内部各包含了一个Selector对象,分别用于监听可接受事件和可读事件。因此Listener线程主要监听是否有新连接请求到达,而Reader线程主要监听客户端连接中是否有新的RPC请求到达,并将RPC请求封装成Call对象,放到callQueue中。

2、       处理请求

该阶段主要任务是从callQueue中获取Call对象,在执行对应的函数调用后尝试直接将结果返回给客户端。但由于某些函数调用返回结构很大或者网络速度过慢,可能难以将结果一次性发送给客户端,此时Handler将尝试将后续任务交给Responder线程。

3、       返回结果

Server端仅有一个Responder线程,它内部包含一个Selector对象,用于监听可写事件。当Handler没能将结果一次性发送给客户端时,会向该Selector对象注册可写事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。



本文主要解析Server类。Server类主要包含了以下几个内部类

内部类
功能
Call
用于存储客户端发来的请求

Listener
监听类,用于监听客户端发来的请求,同时该类内部还有一个静态类Listener.Reader,当监听器监听到用户请求时,并让Reader读取用户请求

Responder
响应RPC请求类,请求处理完毕,由Responder发送给请求客户端

Connection
连接类,真正的客户端请求读取逻辑在这个类中

Handler
请求处理类,会循环阻塞读取callQueue中的call对象,对其进行操作

分析完客户端的代码,接下来我们分析服务器端的代码,也就是Server类。由以上的分析我们知道,Client端的底层通信直接采用了阻塞式的IO编程,但服务器端却没有采用阻塞式编程,因为当很多Client想要连接到服务端时,如果采用阻塞的IO,那么会对服务器端的性能造成很大的影响。因此Hadoop采用了Java NIO来实现Server端。

Server类是一个抽象类,不能被初始化,那么问题就来了,Hadoop中怎样初始化RPC的服务器呢。但同时我们应该要想到,当NameNode初始化时一定会初始化RPC的服务端,下面列出NameNode初始化的源代码

//代码十二
//org.apache.hadoop.hdfs.server.namenode#initialize
/**
* 初始化NameNode
*/
private void initialize(Configuration conf) throws IOException {
// 创建 rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
//获得serviceRPCServer
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
//获得Server
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
……
startHttpServer(conf);
this.server.start();  //启动RPC Server
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
startTrashEmptier(conf);
}

通过以上代码可知,RPC的server对象是通过RPC类的getServer()方法得到的,下面来具体看这个方法。

//代码十三
//RPC#getServer
public static Server getServer(final Object instance, final String bindAddress, final int port,final int numHandlers,
final boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
查看Server类的构造函数

//代码十四
// RPC.Server#Server
public Server(Object instance, Configuration conf, String bindAddress,  int port,
int numHandlers, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
su
4000
per(bindAddress, port, Invocation.class, numHandlers, conf,
classNameBase(instance.getClass().getName()), secretManager);
this.instance = instance;
this.verbose = verbose;
}
这里我们可以发现在Serever类的构造函数中,调用了父类的构造函数。因此可以说getServer()方法是一个创建Server对象的工厂方法,但创建的却是RPC.Server类,而该类又调用了父类ipc.Server类的构造函数,因此我们就明白了Server的初始化方法。查看父类的构造函数

//代码十五
//Server#Server
protected Server(String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.port = port;
this.paramClass = paramClass;
this.handlerCount = handlerCount;
……
// Start the listener here and let it bind to the port
//创建一个Listener类的实例
listener = new Listener();
this.port = listener.getAddress().getPort();
this.rpcMetrics = RpcInstrumentation.create(serverName, this.port);
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);

// Create the responder here
//创建一个Responder类的实例
responder = new Responder();

if (isSecurityEnabled) {
SaslRpcServer.init(conf);
}
}


查看Listener的构造方法

//代码十六
//Server.Listener#Listener
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();//获得一个Server Socket实例
acceptChannel.configureBlocking(false);//设置成非阻塞模式

// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength);//将server socket绑定到address的地址和port端口
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();//获得一个Selector实例
readers = new Reader[readThreads];  //创建一个Reader对象的数组
readPool = Executors.newFixedThreadPool(readThreads);//创建一个线程池
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);//注册accept事件
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
这里我们创建了一个Listener的内部类Reader的数组readers,里面又创建了readThreads个Reader类的实例,并且启动了这些Reader线程,此后这些线程就会去执行run()方法。查看Reader类的run()方法

//代码十七
//Server.Listener.Reader#run
public void run() {
LOG.info("Starting SocketReader");
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
//阻塞直到有连接到来,在startAdd()中来wakeup
readSelector.select();
while (adding) {
//等待,直到finishAdd后来notify
this.wait(1000);
}
//获得感兴趣的事件发生的channl的key并读其中的数据
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
if (running) {                      // unexpected -- log it
LOG.info(getName() + " caught: " +
StringUtils.stringifyException(e));
}
} catch (IOException ex) {
LOG.error("Error in Reader", ex);
}
}
}
}


上面我们看到,当该线程刚开始运行时由于没有客户端连接到服务器,所以会一直阻塞在“readSelector.select()”处。

回到代码十五,我们接下来看Responder类的构造函数

//代码十八
// Server#Responder#Responder
Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open(); // create a selector
pending = 0;
}
在该类中,创建了一个Selector对象,并赋值给成员变量writeSelector。其中“pending”代表等待被注册的连接数。接下来就是要启动Server去提供服务。我们查看Server类的start()方法

//代码十九
// Server#start
/**
* 启动服务器。在处理调用之前必须先启动服务器*/
public synchronized void start() {
responder.start();     //启动Responder
listener.start();       //启动Listener
handlers = new Handler[handlerCount];    //创建handler池

for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);   //创建handler
handlers[i].start();             //启动handler
}
}


注意,其中的内部类Responder、Listener、Handler都继承了Thread,所以这里将这三个类对应的实例启动后,就启动了这些线程,并相继调用这些线程的run()方法。

       从上表得知Server中的内部类Listener用来监听连接,那么我们就来看一下Listener类的run()方法

代码二十
// Server.Listener#run
public void run() {
……
while (running) {
SelectionKey key = null;
try {
//等待连接过来,若有感兴趣的事件发生,则返回
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
//如果为可接受事件,则进入下面函数进行具体处理
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
……
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
cleanupConnections(false);
}
……
}
}
查看doAccept()方法来了解具体是怎样接受请求的

//代码二十一
// Server.Listener#doAccept
void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
//建立连接
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
//从readers池中获得一个reader线程
Reader reader = getReader();
try {
//激活readSelector,设置adding为true
reader.startAdd();
//将读事件设置成感兴趣的事件
SelectionKey readKey = reader.registerChannel(channel);
//创建一个连接对象
c = new Connection(readKey, channel, System.currentTimeMillis());
//将connection对象注入readKey
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
if (LOG.isDebugEnabled())
LOG.debug("Server connection from " + c.toString() +
"; # active connections: " + numConnections +
"; # queued calls: " + callQueue.size());
} finally {
reader.finishAdd();
}
}
}


代码二十一对接收到的连接进行一些处理,如启动readers中的一个可用线程,该线程专门用来接受这个连接的读事件,在该连接上注册感兴趣的事件为读事件,然后将该连接附加到该key上。下面分别查看代码二十一中涉及的代码。首先查看getReader()方法。

//代码二十二
// Server.Listener#getReader
//该方法将返回Readers中下一个可用的reader,循环数组实现
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}


接下来查看startAdd()方法,在该方法中,唤醒了reader的run()方法中阻塞的select()方法。由于adding为true,所以会进入while循环处于wait状态。如果当执行wakeup时,readSelector线程并没有阻塞在select()处,那么任何时候只要执行到select()方法,该方法都会返回,如果没有返回值那就在while里面等待,直到在finishAdd()中唤醒该线程。

//代码二十三
// Server.Listener.Reader#startAdd
//该函数用于让reader处于等待新channel被注册到readSelect上的状态
public void startAdd() {
adding = true;
readSelector.wakeup();
}


接下来查看代码二十一中registerChannel()方法。该方法非常简单,就是将channel感兴趣的读事件注册到readSelector上。

//代码二十四
// Server.Listener.Reader#startAdd
public synchronized SelectionKey registerChannel(SocketChannel channel)
throws IOException {
return channel.register(readSelector, SelectionKey.OP_READ);
}


回到代码二十一的Connection构造方法。下面查看Server的内部类Connection的构造方法Connection。

//代码二十五
// Server.Connection#Connection
public Connection(SelectionKey key, SocketChannel channel,
long lastContact) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.unwrappedData = null;
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
this.addr = socket.getInetAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
this.hostAddress = addr.getHostAddress();
}
this.remotePort = socket.getPort();
this.responseQueue = new LinkedList<Call>();
if (socketSendBufferSize != 0) {
try {
socket.setSendBufferSize(socketSendBufferSize);
} catch (IOException e) {
LOG.warn("Connection: unable to set socket send buffer size to " +
socketSendBufferSize);
}
}
}
创建完Connection对象后,我们将它加入到connectionList这个Server类的成员变量中,该变量存储了从客户端到服务器的所有连接。处理完以上事情后就可以把检测通道上的读事件交给readSelector了,所以我们调用finishAdd()方法来唤醒reader类的run()方法中正在wait的线程。finishAdd()方法代码如下,很简单,就是唤醒线程,让该线程监听channel上的读事件
//代码二十六
// Server.Listener.Reader#finishAdd
public synchronized void finishAdd() {
adding = false;
this.notify();
}
下面的分析具体参见Hadoop RPC源码分析——server类(二)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: