您的位置:首页 > 编程语言

HDFS1.0源代码解析—Hadoop的RPC机制之Client解析

2012-07-07 00:38 483 查看
好久没有更新Hadoop相关的博客了,实在是各种压力缠身,各种东西都没准备好,面对即将找工作有点没有了节奏。

ok,开始说说今天的主题Hadoop的RPC机制,之所以在HDFS源码解析的系列中添加这部分的内容,是因为DN和NN交互使用的就是RPC的机制,而RPC机制这部分代码年前也是比较深入的研究过,但是是模仿RPC的机制进行分布式检索的实现。

开始先介绍一下RPC几个主要的组成类RPC.java、Client.java、Server.java,其中RPC类主要是提供对外服务的函数实现动态代理机制,Client是RPC进行服务的函数,主要是连接服务器、传递函数名和相应的参数、等待结果返回,Server主要接受Client的请求、执行相应的函数、返回结果。

Hadoop中的RPC比较难理解的部分就是动态代理的实现,下边引用一个其他人博客里的图片(/article/1765800.html),感觉这个图画的比较清楚。从图中可以看出动态代理其实只能代理某一个接口,所以所有需要被动态代理的实现类都要实现该接口,在Hadoop中接口是VersionedProtocol。在动态代理中结合了InvocationHandler的功能,保证匿名实现类的方法能够得到正确的执行。使用匿名实现类的对象调用某个方法,实际上调用的是InvocationHandler中的invoke方法,通过该方法将要调用的函数名以及参数告知服务器端,后边会详细介绍一些细节。



结合NN和DN之间的交互作为例子对RPC的原理做一个简单的剖析

在DataNode.java的startDataNode函数中有这样一个调用

348     this.namenode = (DatanodeProtocol)
349       RPC.waitForProxy(DatanodeProtocol.class,
350                        DatanodeProtocol.versionID,
351                        nameNodeAddr,
352                        conf);
我们可以看到通过调用RPC的waitForProxy方法创建了一个DatanodeProtocol对象namenode(其中DatanodeProtocol实现了接口VersionedProtocol),那么namenode应该就是前边提到的匿名实现类的对象,通过这个对象就可以与NN进行交互了。

下边看一下RPC中的相关实现:

首先看一下waitForProxy函数干了些什么

329     while (true) {
330       try {
331         return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
332       } catch(ConnectException se) {  // namenode has not been started
很明显waitForProxy是保证NN出现一些意外的情况下,我们还是可以获得相应的交互对象。那么接着来看getProxy的作用:

392     VersionedProtocol proxy =
393         (VersionedProtocol) Proxy.newProxyInstance(
394             protocol.getClassLoader(), new Class[] { protocol },
395             new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
这里才是生成RPC对象的入口(姑且就称作RPC对象),前两个参数 protocol.getClassLoader(), new Class[] { protocol },比较好理解,一个是获得相应的classloader(因为需要在运行是动态的生成对象),第二个就是说明相应的接口类。可能现在最感兴趣的是第三个参数new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout),我们前边提到过 Proxy需要与InvocationHandler相结合才能达到想要的效果,那么第三个参数到底是不是InvocationHandler的对象呢,我们看一下Invoke类的相关实现。

203   private static class Invoker implements InvocationHandler {
果然没有令我们失望,这与前面的分析完全吻合。

208     public Invoker(Class<? extends VersionedProtocol> protocol,
209         InetSocketAddress address, UserGroupInformation ticket,
210         Configuration conf, SocketFactory factory,
211         int rpcTimeout) throws IOException {
212       this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
213           ticket, rpcTimeout, conf);
214       this.client = CLIENTS.getClient(conf, factory);
215     }
看一下Invoke构造函数主要完成的工作,初始化一个 Client.ConnectionId remoteId;,可以看出这个对象主要封装了服务器地址,代理接口的类型以及连接服务器时用到的一些参数。另外一个关键的对象是client,就是同感client对象与NN进行交互。

前边也提到在使用RPC对象调用相关函数的时,其实进行真正共的是InvocationHandler中的invoke方法。那么我们必须来看一个这个invoke方法的实现了,其中最核心的代码如下

225       ObjectWritable value = (ObjectWritable)
226         client.call(new Invocation(method, args), remoteId);
代码很简短,主要就是调用了client的call方法。其中比较感兴趣的参数应该是new Invocation(method, args)这个Invocation的对象,其实从字面意思我们就可以理解到这个对象封装是具体调用的方法和参数(这些信息需要传递到服务器端,执行服务器端相应的方法)。其实这个Invocation类实现了Writable接口,熟悉源代码的童鞋应该知道这是Hadoop序列化和反序列话的一个接口,作用就明显了,那就是方便把这些信息以序列化的方式传递到服务器端。

下边我们需要看一下在client的call方法中发生了那些剧情,核心代码如下:

1043   public Writable call(Writable param, ConnectionId remoteId)
1044                        throws InterruptedException, IOException {
1045     Call call = new Call(param);
1046     Connection connection = getConnection(remoteId, call);
1047     connection.sendParam(call);                 // send the parameter
1048     boolean interrupted = false;
1049     synchronized (call) {
1050       while (!call.done) {
1051         try {
1052           call.wait();                           // wait for the result
1053         } catch (InterruptedException ie) {
1054           // save the fact that we were interrupted
1055           interrupted = true;
1056         }
1057       }
首先我们可以看到传递给服务器的被序列化后的参数被封装成了一个Call类型的对象,很明显我们就要问一下,你会什么有这么做呢?答案是这样的,call是一个设计很巧妙的类,它封装了查询的信息和查询的结果,阻塞查询线程(查询过程是一个同步的过程)。
接着我来看getConnection(remoteId, call);干了些什么,核心代码如下:

1188     do {
1189       synchronized (connections) {
1190         connection = connections.get(remoteId);
1191         if (connection == null) {
1192           connection = new Connection(remoteId);
1193           connections.put(remoteId, connection);
1194         }
1195       }
1196     } while (!connection.addCall(call));
1197
1198     //we don't invoke the method below inside "synchronized (connections)"
1199     //block above. The reason for that is if the server happens to be slow,
1200     //it will take longer to establish a connection and that will slow the
1201     //entire system down.
1202     connection.setupIOstreams();


从代码中我们可以看出在Connection类中有一个connections的变量保存到不同节点的connection(类似一个连接池,其实保存的就是一坨connection对象,具体连接的信息在connection对象的内部)。其中关键的部分是connection对象的生成,这部分的代码比较多,简单的介绍下过程大家结合理解代码,其中主要的部分是在判断是否使用了sasl(其实我不懂就不误导大家了)。

最重要的一句话是connection.setupIOstreams();这才是某条具体的connection与server连接的过程,这也是Connection的核心部分,具体核心代码如下:

559         while (true) {
560           setupConnection();
561           InputStream inStream = NetUtils.getInputStream(socket);
562           OutputStream outStream = NetUtils.getOutputStream(socket);
563           writeRpcHeader(outStream);

602           this.in = new DataInputStream(new BufferedInputStream
603               (new PingInputStream(inStream)));
604           this.out = new DataOutputStream
605           (new BufferedOutputStream(outStream));
606           writeHeader();
607
608           // update last activity time
609           touch();
610
611           // start the receiver thread after the socket connection has been set up
612           start();
613           return;
614         }
其中 setupConnection();主要完成于具体的server连接,初始化Connection中的socket变量,后边两句就是获取socket上的输入输出流,下一句是通过输出流向server发送RPC的头信息,在后边的in和out的封装就是使得网络信息传输跟流文件的操作一样。一个不起眼的start()其实很重要,前边一直没有提其实Connection类是一个线程类,那么这里调用start,其实就是启动Connection的线程执行其中的run方法。ok,我们再来围观下这run方法。

739     public void run() {
740       if (LOG.isDebugEnabled())
741         LOG.debug(getName() + ": starting, having connections "
742             + connections.size());
743
744       while (waitForWork()) {//wait here for work - read or close connection
745         receiveResponse();
746       }
747
748       close();
749
750       if (LOG.isDebugEnabled())
751         LOG.debug(getName() + ": stopped, remaining connections "
752             + connections.size());
753     }
非常简短的代码,其实里面的核心代码就3句waitForWork()、receiveResponse();、close();我们来挨句看一下它们都在干些神马。

在开始详细介绍这些东东之前我觉着有必要从整体上介绍下Connection这类的结构,Connection包含的主要成员变量有

private InetSocketAddress server;
private final ConnectionId remoteId;
private Socket socket = null;
private DataInputStream in;
private DataOutputStream out;
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
其中socket是保存与server连接的信息,calls保存的是提交到这条connection上的call的集合,每一个call对应一个唯一的callId,这样查询结果返回是就能通过callId知道是哪个call请求的结果。前边也提到Connection其实是一个线程,这个线程启动之后会不断的检查calls这队列是否为空,如果不为空线程就会尝试读取数据(不为空说明有call的查询结果还没有返回)。

大体介绍了Connection类的结构后,我们看一下waitForWork()的工作过程

696     private synchronized boolean waitForWork() {
697       if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
698         long timeout = maxIdleTime-
699               (System.currentTimeMillis()-lastActivity.get());
700         if (timeout>0) {
701           try {
702             wait(timeout);
703           } catch (InterruptedException e) {}
704         }
705       }
706
707       if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
708         return true;
709       } else if (shouldCloseConnection.get()) {
710         return false;
711       } else if (calls.isEmpty()) { // idle connection closed or stopped
712         markClosed(null);
713         return false;
714       } else { // get stopped but there are still pending requests
715         markClosed((IOException)new IOException().initCause(
716             new InterruptedException()));
717         return false;
718       }
719     }
从代码可以看出它的功能就是判断calls是否为空,决定这条socket连接是否需要继续维持。接着来看receiveResponse();

799       try {
800         int id = in.readInt();                    // try to read an id
801
802         if (LOG.isDebugEnabled())
803           LOG.debug(getName() + " got value #" + id);
804
805         Call call = calls.get(id);
806
807         int state = in.readInt();     // read call status
808         if (state == Status.SUCCESS.state) {
809           Writable value = ReflectionUtils.newInstance(valueClass, conf);
810           value.readFields(in);                 // read value
811           call.setValue(value);
812           calls.remove(id);
813         } else if (state == Status.ERROR.state) {
814           call.setException(new RemoteException(WritableUtils.readString(in),
815                                                 WritableUtils.readString(in)));
816           calls.remove(id);
可以看出他的主要功能是读取server返回的查询结果,首先读取的callid,上边也提到通过callid从calls队列中取出对应的call,call对象同时保存查询的结果,最后将已经接收到结果的call从calls队列中删除。

好像整个client的过程就是大体这样一个流程,我的深度优先讲解,可能包您看晕了,说实话写的逻辑性真心差,但是感觉这个框架设计还是很美妙,自己从中学到了不少的东西,也希望你能够好好爵嚼下源代码,体会下大师们的设计。关于Server的分析请看下回分解!欢迎批评指正!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: