您的位置:首页 > 编程语言

带有HA功能的Hadoop Client端RPC实现原理与代码分析

2015-12-31 15:34 429 查看

带有HA功能的Hadoop Client端RPC实现原理与代码分析

Leave a reply

create命令是文件系统常见的命令之一,下面以HDFS中这个PRC命令为例,说明这个HDFS RPC在Client端的执行过程。
DistributedFileSystem.create()
     ->dfs.create()/DFSClient.create() 方法返回一个 DFSOutputStream
          ->DFSOutputStream.newStreamForCreate()返回一个DFSOutputStream
               ->new DFSOutputStream()
                    ->DFSClient.namenode.create()
在此之后,create命令传递给RPC在client端的代理DFSClient.namenode。
final ClientProtocol namenode;
说明namenode实现的接口是ClientProtocol,也就是Client与NameNode之间RPC通信的协议。
注意:由于NameNode-HA机制下,client端的底层代理是有多个的,分别连接Active和Standby的NN,我们称这两个代理为底层代理。但是我们对用户调用需要呈现统一的接口,那么就出现了一个上层代理来统一这个接口。所有由用户来的调用都是先到达上层代理,通过上层代理转发到下层代理。同时上层代理还会根据底层代理返回的Exception来决定Failover或者Retry等操作。在HDFS中上层代理是通过调用RetryProxy.create()函数生成的(函数里面调用Proxy.newProxyInstance(),下面会讲到)。底层这两个代理对象是包装在FailoverProxyProvider类中的(目前默认的该类的实现是ConfiguredFailoverProxyProvider),由它负责管理这两个代理。当上层代理接收到来自用户的一个RPC命令之后,转发给当前正在使用的底层代理(由ConfiguredFailoverProxyProvider.currentProxyIndex决定)执行,然后看是否抛出异常。如果抛出了异常,根据异常的种类来判断是执行failover,还是retry,或者两者都不做。
这里这个namenode变量实际上是个上层代理。作为DFSClient的成员变量是怎么生成的呢?看DFSClient的构造函数:

if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo =
NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}

从这段代码中可以看出,如果在构造DFSClient参数中没有给定rpcNamenode变量,那么就走esle路径,通过NameNodeProxies.createProxy()函数生成一个ProxyAndInfo<ClientProtocol>类型的变量,这个变量里面包装了一个上层代理对象和相应的Delegation Token Service(安全机制)。这个上层代理就是直接与client端用户代码打交道的代理。
在这个NameNodeProxies.createProxy()函数中:
(1)首先会创建FailoverProxyProvider,在配置了HA机制的集群中,目前默认的是ConfiguredFailoverProxyProvider类。ConfiguredFailoverProxyProvider类中有个成员变量List<AddressRpcProxyPair<T>> proxies,这个数组中分别存放两个连接到NameNode-HA机制中Active NN和Standby NN的URI的底层代理,在下层代理发送RPC请求返回某些失败信息可以断定此代理连接的NN
fail时,执行performFailover()把当前使用的底层代理切换到和另外一个NN连接的底层代理上。在创建这个ConfiguredFailoverProxyProvider对象时,相应的底层代理还没有被填充。
(2)调用RetryProxy.create()函数,生成上层代理对象。在这里用到了Java的反射和代理机制:
public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider,
RetryPolicy retryPolicy) {
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
new RetryInvocationHandler(proxyProvider, retryPolicy)
);
}

这个create函数返回了一个上层代理对象,代理对象实现了ClientProtocol接口。那么用户就可以通过这个代理对象调ClientProtocol接口中相应的方法。根据Java的动态代理机制,用户对这个代理对象的方法调用都转换为对RetryInvocationHandler(proxyProvider, retryPolicy)对象中invoke()方法的调用了。RetryInvocationHandler是与FailoverProxyProvider密切相关的,因为它需要FailoverProxyProvider提供底层代理的支持。
下面看看RetryInvocationHanlder.invoke(Object proxy,Method method, Object[] args)的实现:
(1)参数proxy是代理对象,也就是前面Proxy.newProxyInstance()生成的对象,就是我们的上层代理对象。不过目前好像没用到。
(2)首先要获取一个RetryPolicy,默认的策略是在构造RetryInvocationHandler时的参数。
在Client与NameNode之间的ClientProtocol的RetryPolicy是
RetryPolicies.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL ,
              config.maxFailoverAttempts, config.failoverSleepBaseMillis,
              config.failoverSleepMaxMillis)
(3)执行方法调用
Object ret = invokeMethod (method, args);
     -> method.invoke(currentProxy, args);
currentProxy是由FailoverProxyProvider提供的现在正在使用的底层代理。当NN发生主从切换的时候,这个currentProxy也会发生相应的变化。
(4)后面就是一大堆的异常处理,如果在调用相应方法的过程中出现了异常怎么办?
首先根据生成动态代理(上层代理)的时候给定的RetryPolicy策略,判断是不是应该Retry。我们这里的RetryPolicy是FailoverOnNetworkExceptionRetry,所以调用对应的shouldRetry()函数。
(4.1)如果Retry的次数已经超过最大尝试的次数了,那么就返回一个
RetryAction.RetryDecision.FAIL的RetryAction
(4.2) 如果抛出的异常是ConnectionException,NoRouteToHostException,UnKnownHostException,StandbyException中的一个,说明底层代理在RPC过程中Active NN连不上或者宕机或者已经发生主从切换了,那么就需要返回一个RetryAction.RetryDecision.FAILOVER_AND_RETRY的RetryAction,需要由相应的FailoverProxyProvider执行performFailover()操作,然后用另外一个NN的底层代理重试。
(4.3)如果抛出的异常是SocketException,或者是非RemoteException的IOException,那么就无法判断这个RPC命令到底是不是执行成功了。可能是本地的Socket或者IO出问题,也可能是NN端的Socket或者IO问题。那么现在采取的办法就是:如果RPC命令是idempotent的,也就是多次执行是没有副作用的(关于哪些RPC命令是idempotent,可以参考https://issues.apache.org/jira/browse/HDFS-2393 ),那么就连接另外的一个底层代理试试;如果RPC命令不是idempotent,只能返回失败了。
到此为止上层代理的执行流程就很清楚了,那么底层代理是怎么生成的呢?
我们知道底层代理是包装在了ConfiguredFailoverProxyProvider 这个类里面,生成底层代理是通过ConfiguredFailoverProxyProvider.getProxy()中生成的。
public synchronized T getProxy() {
AddressRpcProxyPair current = proxies.get(currentProxyIndex );
if (current.namenode == null) {
try {
current.namenode = NameNodeProxies.createNonHAProxy( conf,
current.address, xface, ugi, false ).getProxy();
} catch (IOException e) {
LOG.error( "Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e);
}
}
return (T)current.namenode;
}

也就是分别连接Active和Standby NN的底层代理其实是借用了非HA机制中生成代理的方法createNonHAProxy().
NameNodeProxies.createNonHAProxy()
     ->NameNodeProxies.createNNProxyWithClientProtocol()
          ->RPC.getProtocolProxy()
               ->把上一步获取的实现了ClientNamenodeProtocolPB接口的代理对象封装成ClientNamenodeProtocolTranslatorPB对象,作用和server端一样,都是进行PB格式数据的序列化和反序列化工作。
不得不说的是,在RPC.getProtocolProxy()函数层层调用之后会进入 ProtobufRpcEngine.getProxy()函数,在这个函数里面又是通过Java的动态代理机制实现的。
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
) throws IOException {

final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false );
}

从中我们可以看出,在NN-HA机制下的RPC client端代理中,使用了两次Java的动态代理机制,分别是在上层代理和底层代理中。通过动态代理机制,把RPC请求集中到代理对象处理方法中,实现了多种方法的同一处理流程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: