HDFS1.0源代码解析—Hadoop的RPC机制之Client解析
2012-07-07 00:38
483 查看
好久没有更新Hadoop相关的博客了,实在是各种压力缠身,各种东西都没准备好,面对即将找工作有点没有了节奏。
ok,开始说说今天的主题Hadoop的RPC机制,之所以在HDFS源码解析的系列中添加这部分的内容,是因为DN和NN交互使用的就是RPC的机制,而RPC机制这部分代码年前也是比较深入的研究过,但是是模仿RPC的机制进行分布式检索的实现。
开始先介绍一下RPC几个主要的组成类RPC.java、Client.java、Server.java,其中RPC类主要是提供对外服务的函数实现动态代理机制,Client是RPC进行服务的函数,主要是连接服务器、传递函数名和相应的参数、等待结果返回,Server主要接受Client的请求、执行相应的函数、返回结果。
Hadoop中的RPC比较难理解的部分就是动态代理的实现,下边引用一个其他人博客里的图片(/article/1765800.html),感觉这个图画的比较清楚。从图中可以看出动态代理其实只能代理某一个接口,所以所有需要被动态代理的实现类都要实现该接口,在Hadoop中接口是VersionedProtocol。在动态代理中结合了InvocationHandler的功能,保证匿名实现类的方法能够得到正确的执行。使用匿名实现类的对象调用某个方法,实际上调用的是InvocationHandler中的invoke方法,通过该方法将要调用的函数名以及参数告知服务器端,后边会详细介绍一些细节。
结合NN和DN之间的交互作为例子对RPC的原理做一个简单的剖析
在DataNode.java的startDataNode函数中有这样一个调用
下边看一下RPC中的相关实现:
首先看一下waitForProxy函数干了些什么
前边也提到在使用RPC对象调用相关函数的时,其实进行真正共的是InvocationHandler中的invoke方法。那么我们必须来看一个这个invoke方法的实现了,其中最核心的代码如下
下边我们需要看一下在client的call方法中发生了那些剧情,核心代码如下:
接着我来看getConnection(remoteId, call);干了些什么,核心代码如下:
从代码中我们可以看出在Connection类中有一个connections的变量保存到不同节点的connection(类似一个连接池,其实保存的就是一坨connection对象,具体连接的信息在connection对象的内部)。其中关键的部分是connection对象的生成,这部分的代码比较多,简单的介绍下过程大家结合理解代码,其中主要的部分是在判断是否使用了sasl(其实我不懂就不误导大家了)。
最重要的一句话是connection.setupIOstreams();这才是某条具体的connection与server连接的过程,这也是Connection的核心部分,具体核心代码如下:
在开始详细介绍这些东东之前我觉着有必要从整体上介绍下Connection这类的结构,Connection包含的主要成员变量有
大体介绍了Connection类的结构后,我们看一下waitForWork()的工作过程
好像整个client的过程就是大体这样一个流程,我的深度优先讲解,可能包您看晕了,说实话写的逻辑性真心差,但是感觉这个框架设计还是很美妙,自己从中学到了不少的东西,也希望你能够好好爵嚼下源代码,体会下大师们的设计。关于Server的分析请看下回分解!欢迎批评指正!
ok,开始说说今天的主题Hadoop的RPC机制,之所以在HDFS源码解析的系列中添加这部分的内容,是因为DN和NN交互使用的就是RPC的机制,而RPC机制这部分代码年前也是比较深入的研究过,但是是模仿RPC的机制进行分布式检索的实现。
开始先介绍一下RPC几个主要的组成类RPC.java、Client.java、Server.java,其中RPC类主要是提供对外服务的函数实现动态代理机制,Client是RPC进行服务的函数,主要是连接服务器、传递函数名和相应的参数、等待结果返回,Server主要接受Client的请求、执行相应的函数、返回结果。
Hadoop中的RPC比较难理解的部分就是动态代理的实现,下边引用一个其他人博客里的图片(/article/1765800.html),感觉这个图画的比较清楚。从图中可以看出动态代理其实只能代理某一个接口,所以所有需要被动态代理的实现类都要实现该接口,在Hadoop中接口是VersionedProtocol。在动态代理中结合了InvocationHandler的功能,保证匿名实现类的方法能够得到正确的执行。使用匿名实现类的对象调用某个方法,实际上调用的是InvocationHandler中的invoke方法,通过该方法将要调用的函数名以及参数告知服务器端,后边会详细介绍一些细节。
结合NN和DN之间的交互作为例子对RPC的原理做一个简单的剖析
在DataNode.java的startDataNode函数中有这样一个调用
348 this.namenode = (DatanodeProtocol) 349 RPC.waitForProxy(DatanodeProtocol.class, 350 DatanodeProtocol.versionID, 351 nameNodeAddr, 352 conf);我们可以看到通过调用RPC的waitForProxy方法创建了一个DatanodeProtocol对象namenode(其中DatanodeProtocol实现了接口VersionedProtocol),那么namenode应该就是前边提到的匿名实现类的对象,通过这个对象就可以与NN进行交互了。
下边看一下RPC中的相关实现:
首先看一下waitForProxy函数干了些什么
329 while (true) { 330 try { 331 return getProxy(protocol, clientVersion, addr, conf, rpcTimeout); 332 } catch(ConnectException se) { // namenode has not been started很明显waitForProxy是保证NN出现一些意外的情况下,我们还是可以获得相应的交互对象。那么接着来看getProxy的作用:
392 VersionedProtocol proxy = 393 (VersionedProtocol) Proxy.newProxyInstance( 394 protocol.getClassLoader(), new Class[] { protocol }, 395 new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));这里才是生成RPC对象的入口(姑且就称作RPC对象),前两个参数 protocol.getClassLoader(), new Class[] { protocol },比较好理解,一个是获得相应的classloader(因为需要在运行是动态的生成对象),第二个就是说明相应的接口类。可能现在最感兴趣的是第三个参数new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout),我们前边提到过 Proxy需要与InvocationHandler相结合才能达到想要的效果,那么第三个参数到底是不是InvocationHandler的对象呢,我们看一下Invoke类的相关实现。
203 private static class Invoker implements InvocationHandler {果然没有令我们失望,这与前面的分析完全吻合。
208 public Invoker(Class<? extends VersionedProtocol> protocol, 209 InetSocketAddress address, UserGroupInformation ticket, 210 Configuration conf, SocketFactory factory, 211 int rpcTimeout) throws IOException { 212 this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, 213 ticket, rpcTimeout, conf); 214 this.client = CLIENTS.getClient(conf, factory); 215 }看一下Invoke构造函数主要完成的工作,初始化一个 Client.ConnectionId remoteId;,可以看出这个对象主要封装了服务器地址,代理接口的类型以及连接服务器时用到的一些参数。另外一个关键的对象是client,就是同感client对象与NN进行交互。
前边也提到在使用RPC对象调用相关函数的时,其实进行真正共的是InvocationHandler中的invoke方法。那么我们必须来看一个这个invoke方法的实现了,其中最核心的代码如下
225 ObjectWritable value = (ObjectWritable) 226 client.call(new Invocation(method, args), remoteId);代码很简短,主要就是调用了client的call方法。其中比较感兴趣的参数应该是new Invocation(method, args)这个Invocation的对象,其实从字面意思我们就可以理解到这个对象封装是具体调用的方法和参数(这些信息需要传递到服务器端,执行服务器端相应的方法)。其实这个Invocation类实现了Writable接口,熟悉源代码的童鞋应该知道这是Hadoop序列化和反序列话的一个接口,作用就明显了,那就是方便把这些信息以序列化的方式传递到服务器端。
下边我们需要看一下在client的call方法中发生了那些剧情,核心代码如下:
1043 public Writable call(Writable param, ConnectionId remoteId) 1044 throws InterruptedException, IOException { 1045 Call call = new Call(param); 1046 Connection connection = getConnection(remoteId, call); 1047 connection.sendParam(call); // send the parameter 1048 boolean interrupted = false; 1049 synchronized (call) { 1050 while (!call.done) { 1051 try { 1052 call.wait(); // wait for the result 1053 } catch (InterruptedException ie) { 1054 // save the fact that we were interrupted 1055 interrupted = true; 1056 } 1057 }首先我们可以看到传递给服务器的被序列化后的参数被封装成了一个Call类型的对象,很明显我们就要问一下,你会什么有这么做呢?答案是这样的,call是一个设计很巧妙的类,它封装了查询的信息和查询的结果,阻塞查询线程(查询过程是一个同步的过程)。
接着我来看getConnection(remoteId, call);干了些什么,核心代码如下:
1188 do { 1189 synchronized (connections) { 1190 connection = connections.get(remoteId); 1191 if (connection == null) { 1192 connection = new Connection(remoteId); 1193 connections.put(remoteId, connection); 1194 } 1195 } 1196 } while (!connection.addCall(call)); 1197 1198 //we don't invoke the method below inside "synchronized (connections)" 1199 //block above. The reason for that is if the server happens to be slow, 1200 //it will take longer to establish a connection and that will slow the 1201 //entire system down. 1202 connection.setupIOstreams();
从代码中我们可以看出在Connection类中有一个connections的变量保存到不同节点的connection(类似一个连接池,其实保存的就是一坨connection对象,具体连接的信息在connection对象的内部)。其中关键的部分是connection对象的生成,这部分的代码比较多,简单的介绍下过程大家结合理解代码,其中主要的部分是在判断是否使用了sasl(其实我不懂就不误导大家了)。
最重要的一句话是connection.setupIOstreams();这才是某条具体的connection与server连接的过程,这也是Connection的核心部分,具体核心代码如下:
559 while (true) { 560 setupConnection(); 561 InputStream inStream = NetUtils.getInputStream(socket); 562 OutputStream outStream = NetUtils.getOutputStream(socket); 563 writeRpcHeader(outStream); 602 this.in = new DataInputStream(new BufferedInputStream 603 (new PingInputStream(inStream))); 604 this.out = new DataOutputStream 605 (new BufferedOutputStream(outStream)); 606 writeHeader(); 607 608 // update last activity time 609 touch(); 610 611 // start the receiver thread after the socket connection has been set up 612 start(); 613 return; 614 }其中 setupConnection();主要完成于具体的server连接,初始化Connection中的socket变量,后边两句就是获取socket上的输入输出流,下一句是通过输出流向server发送RPC的头信息,在后边的in和out的封装就是使得网络信息传输跟流文件的操作一样。一个不起眼的start()其实很重要,前边一直没有提其实Connection类是一个线程类,那么这里调用start,其实就是启动Connection的线程执行其中的run方法。ok,我们再来围观下这run方法。
739 public void run() { 740 if (LOG.isDebugEnabled()) 741 LOG.debug(getName() + ": starting, having connections " 742 + connections.size()); 743 744 while (waitForWork()) {//wait here for work - read or close connection 745 receiveResponse(); 746 } 747 748 close(); 749 750 if (LOG.isDebugEnabled()) 751 LOG.debug(getName() + ": stopped, remaining connections " 752 + connections.size()); 753 }非常简短的代码,其实里面的核心代码就3句waitForWork()、receiveResponse();、close();我们来挨句看一下它们都在干些神马。
在开始详细介绍这些东东之前我觉着有必要从整体上介绍下Connection这类的结构,Connection包含的主要成员变量有
private InetSocketAddress server; private final ConnectionId remoteId; private Socket socket = null; private DataInputStream in; private DataOutputStream out; private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();其中socket是保存与server连接的信息,calls保存的是提交到这条connection上的call的集合,每一个call对应一个唯一的callId,这样查询结果返回是就能通过callId知道是哪个call请求的结果。前边也提到Connection其实是一个线程,这个线程启动之后会不断的检查calls这队列是否为空,如果不为空线程就会尝试读取数据(不为空说明有call的查询结果还没有返回)。
大体介绍了Connection类的结构后,我们看一下waitForWork()的工作过程
696 private synchronized boolean waitForWork() { 697 if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { 698 long timeout = maxIdleTime- 699 (System.currentTimeMillis()-lastActivity.get()); 700 if (timeout>0) { 701 try { 702 wait(timeout); 703 } catch (InterruptedException e) {} 704 } 705 } 706 707 if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { 708 return true; 709 } else if (shouldCloseConnection.get()) { 710 return false; 711 } else if (calls.isEmpty()) { // idle connection closed or stopped 712 markClosed(null); 713 return false; 714 } else { // get stopped but there are still pending requests 715 markClosed((IOException)new IOException().initCause( 716 new InterruptedException())); 717 return false; 718 } 719 }从代码可以看出它的功能就是判断calls是否为空,决定这条socket连接是否需要继续维持。接着来看receiveResponse();
799 try { 800 int id = in.readInt(); // try to read an id 801 802 if (LOG.isDebugEnabled()) 803 LOG.debug(getName() + " got value #" + id); 804 805 Call call = calls.get(id); 806 807 int state = in.readInt(); // read call status 808 if (state == Status.SUCCESS.state) { 809 Writable value = ReflectionUtils.newInstance(valueClass, conf); 810 value.readFields(in); // read value 811 call.setValue(value); 812 calls.remove(id); 813 } else if (state == Status.ERROR.state) { 814 call.setException(new RemoteException(WritableUtils.readString(in), 815 WritableUtils.readString(in))); 816 calls.remove(id);可以看出他的主要功能是读取server返回的查询结果,首先读取的callid,上边也提到通过callid从calls队列中取出对应的call,call对象同时保存查询的结果,最后将已经接收到结果的call从calls队列中删除。
好像整个client的过程就是大体这样一个流程,我的深度优先讲解,可能包您看晕了,说实话写的逻辑性真心差,但是感觉这个框架设计还是很美妙,自己从中学到了不少的东西,也希望你能够好好爵嚼下源代码,体会下大师们的设计。关于Server的分析请看下回分解!欢迎批评指正!
相关文章推荐
- HDFS1.0源代码解析—Hadoop的RPC机制之Server端解析
- Hadoop_HDFS文件读写代码流程解析和副本存放机制
- 【Hadoop】HDFS笔记(一):Hadoop的RPC机制
- HDFS1.0源代码解析—DataNode启动(一)
- Hadoop源码分析HDFS Client向HDFS写入数据的过程解析
- HDFS1.0源代码解析—DataNode类主要数据成员和函数
- Hadoop之——HDFS的RPC机制
- HDFS1.0源代码解析—数据传输和接受的类BlockSender和BlockReceiver
- HDFS1.0源代码解析—DataNode启动(二)
- hadoop中HDFS读写机制解析
- Hadoop源码解析之 rpc通信 client到server通信
- Hadoop源码解析之 rpc通信 client到server通信
- Hadoop基础之RPC机制以及HDFS源码分析
- Hadoop详解(二)——HDFS的命令,执行过程,Java接口,原理详解。RPC机制
- Hadoop中RPC机制详解之Client端
- Hadoop源码分析HDFS Client向HDFS写入数据的过程解析
- HDFS1.0源代码解析—DataNode启动(三)
- HDFS1.0源代码解析—DataNode数据接收线程DataXceiverServer与DataXceiver解析
- Hadoop的RPC机制_Client组件
- 大数据之路-Hadoop-5-HDFS原理解析及NameNode、DataNode工作机制