带有HA功能的Hadoop Client端RPC实现原理与代码分析
2015-12-31 15:34
429 查看
带有HA功能的Hadoop Client端RPC实现原理与代码分析
Leave a replycreate命令是文件系统常见的命令之一,下面以HDFS中这个PRC命令为例,说明这个HDFS RPC在Client端的执行过程。
DistributedFileSystem.create()
->dfs.create()/DFSClient.create() 方法返回一个 DFSOutputStream
->DFSOutputStream.newStreamForCreate()返回一个DFSOutputStream
->new DFSOutputStream()
->DFSClient.namenode.create()
在此之后,create命令传递给RPC在client端的代理DFSClient.namenode。
final ClientProtocol namenode;
说明namenode实现的接口是ClientProtocol,也就是Client与NameNode之间RPC通信的协议。
注意:由于NameNode-HA机制下,client端的底层代理是有多个的,分别连接Active和Standby的NN,我们称这两个代理为底层代理。但是我们对用户调用需要呈现统一的接口,那么就出现了一个上层代理来统一这个接口。所有由用户来的调用都是先到达上层代理,通过上层代理转发到下层代理。同时上层代理还会根据底层代理返回的Exception来决定Failover或者Retry等操作。在HDFS中上层代理是通过调用RetryProxy.create()函数生成的(函数里面调用Proxy.newProxyInstance(),下面会讲到)。底层这两个代理对象是包装在FailoverProxyProvider类中的(目前默认的该类的实现是ConfiguredFailoverProxyProvider),由它负责管理这两个代理。当上层代理接收到来自用户的一个RPC命令之后,转发给当前正在使用的底层代理(由ConfiguredFailoverProxyProvider.currentProxyIndex决定)执行,然后看是否抛出异常。如果抛出了异常,根据异常的种类来判断是执行failover,还是retry,或者两者都不做。
这里这个namenode变量实际上是个上层代理。作为DFSClient的成员变量是怎么生成的呢?看DFSClient的构造函数:
if (rpcNamenode != null) { // This case is used for testing. Preconditions.checkArgument(nameNodeUri == null); this.namenode = rpcNamenode; dtService = null; } else { Preconditions.checkArgument(nameNodeUri != null, "null URI"); NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); }
从这段代码中可以看出,如果在构造DFSClient参数中没有给定rpcNamenode变量,那么就走esle路径,通过NameNodeProxies.createProxy()函数生成一个ProxyAndInfo<ClientProtocol>类型的变量,这个变量里面包装了一个上层代理对象和相应的Delegation Token Service(安全机制)。这个上层代理就是直接与client端用户代码打交道的代理。
在这个NameNodeProxies.createProxy()函数中:
(1)首先会创建FailoverProxyProvider,在配置了HA机制的集群中,目前默认的是ConfiguredFailoverProxyProvider类。ConfiguredFailoverProxyProvider类中有个成员变量List<AddressRpcProxyPair<T>> proxies,这个数组中分别存放两个连接到NameNode-HA机制中Active NN和Standby NN的URI的底层代理,在下层代理发送RPC请求返回某些失败信息可以断定此代理连接的NN
fail时,执行performFailover()把当前使用的底层代理切换到和另外一个NN连接的底层代理上。在创建这个ConfiguredFailoverProxyProvider对象时,相应的底层代理还没有被填充。
(2)调用RetryProxy.create()函数,生成上层代理对象。在这里用到了Java的反射和代理机制:
public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider, RetryPolicy retryPolicy) { return Proxy.newProxyInstance( proxyProvider.getInterface().getClassLoader(), new Class<?>[] { iface }, new RetryInvocationHandler(proxyProvider, retryPolicy) ); }
这个create函数返回了一个上层代理对象,代理对象实现了ClientProtocol接口。那么用户就可以通过这个代理对象调ClientProtocol接口中相应的方法。根据Java的动态代理机制,用户对这个代理对象的方法调用都转换为对RetryInvocationHandler(proxyProvider, retryPolicy)对象中invoke()方法的调用了。RetryInvocationHandler是与FailoverProxyProvider密切相关的,因为它需要FailoverProxyProvider提供底层代理的支持。
下面看看RetryInvocationHanlder.invoke(Object proxy,Method method, Object[] args)的实现:
(1)参数proxy是代理对象,也就是前面Proxy.newProxyInstance()生成的对象,就是我们的上层代理对象。不过目前好像没用到。
(2)首先要获取一个RetryPolicy,默认的策略是在构造RetryInvocationHandler时的参数。
在Client与NameNode之间的ClientProtocol的RetryPolicy是
RetryPolicies.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL ,
config.maxFailoverAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis)
(3)执行方法调用
Object ret = invokeMethod (method, args);
-> method.invoke(currentProxy, args);
currentProxy是由FailoverProxyProvider提供的现在正在使用的底层代理。当NN发生主从切换的时候,这个currentProxy也会发生相应的变化。
(4)后面就是一大堆的异常处理,如果在调用相应方法的过程中出现了异常怎么办?
首先根据生成动态代理(上层代理)的时候给定的RetryPolicy策略,判断是不是应该Retry。我们这里的RetryPolicy是FailoverOnNetworkExceptionRetry,所以调用对应的shouldRetry()函数。
(4.1)如果Retry的次数已经超过最大尝试的次数了,那么就返回一个
RetryAction.RetryDecision.FAIL的RetryAction
(4.2) 如果抛出的异常是ConnectionException,NoRouteToHostException,UnKnownHostException,StandbyException中的一个,说明底层代理在RPC过程中Active NN连不上或者宕机或者已经发生主从切换了,那么就需要返回一个RetryAction.RetryDecision.FAILOVER_AND_RETRY的RetryAction,需要由相应的FailoverProxyProvider执行performFailover()操作,然后用另外一个NN的底层代理重试。
(4.3)如果抛出的异常是SocketException,或者是非RemoteException的IOException,那么就无法判断这个RPC命令到底是不是执行成功了。可能是本地的Socket或者IO出问题,也可能是NN端的Socket或者IO问题。那么现在采取的办法就是:如果RPC命令是idempotent的,也就是多次执行是没有副作用的(关于哪些RPC命令是idempotent,可以参考https://issues.apache.org/jira/browse/HDFS-2393 ),那么就连接另外的一个底层代理试试;如果RPC命令不是idempotent,只能返回失败了。
到此为止上层代理的执行流程就很清楚了,那么底层代理是怎么生成的呢?
我们知道底层代理是包装在了ConfiguredFailoverProxyProvider 这个类里面,生成底层代理是通过ConfiguredFailoverProxyProvider.getProxy()中生成的。
public synchronized T getProxy() { AddressRpcProxyPair current = proxies.get(currentProxyIndex ); if (current.namenode == null) { try { current.namenode = NameNodeProxies.createNonHAProxy( conf, current.address, xface, ugi, false ).getProxy(); } catch (IOException e) { LOG.error( "Failed to create RPC proxy to NameNode", e); throw new RuntimeException(e); } } return (T)current.namenode; }
也就是分别连接Active和Standby NN的底层代理其实是借用了非HA机制中生成代理的方法createNonHAProxy().
NameNodeProxies.createNonHAProxy()
->NameNodeProxies.createNNProxyWithClientProtocol()
->RPC.getProtocolProxy()
->把上一步获取的实现了ClientNamenodeProtocolPB接口的代理对象封装成ClientNamenodeProtocolTranslatorPB对象,作用和server端一样,都是进行PB格式数据的序列化和反序列化工作。
不得不说的是,在RPC.getProtocolProxy()函数层层调用之后会进入 ProtobufRpcEngine.getProxy()函数,在这个函数里面又是通过Java的动态代理机制实现的。
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy ) throws IOException { final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy); return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker), false ); }
从中我们可以看出,在NN-HA机制下的RPC client端代理中,使用了两次Java的动态代理机制,分别是在上层代理和底层代理中。通过动态代理机制,把RPC请求集中到代理对象处理方法中,实现了多种方法的同一处理流程。
相关文章推荐
- 开始学习Python
- java反射
- 序列化--关于 Java 对象序列化您不知道的 5 件事
- VisualC# winform窗体应用程序 语句加this有的怎么不加?
- python实现嵌套列表、字典按某一元素去重复
- Java实战应用:MyBatis实现单表的增删改
- Java API编写
- Spring——setter方式注入和构造函数方式注入
- 中文编码--深入分析 Java 中的中文编码问题
- Spring auto wire(自动装配) 的 五种方式
- python 小实例 斐波那契数列
- Python学习笔记——条件判断与循环
- java7.instance of关键字
- excel_VB宏脚本_批量生成点餐宝接受的格式
- ginx+php-fpm实现原理及问题记录
- asp.net mvc 无刷新加载
- python安装软件出现错误 fatal error: 'libxml/xmlversion.h' file not found
- (转)C#根据当前时间获取周,月,季度,年度等时间段的起止时间
- java final关键字
- java.lang.NoClassDefFoundError: Could not initialize class sun.awt.X11GraphicsEnvironment求大神解答