您的位置:首页 > Web前端 > Node.js

hadoop 2.6 源码 解读之NameNodeRpcServer启动及request处理

2018-04-12 09:45 435 查看

NameNode
4000
RpcServer 启动监听

clientRpcServer 用于响应 hdfs 客户端的 RPC请求

serviceRpcServer 用于响应 DataNode RPC 请求

此外 NameNodeRpcServer 还实现各种协议接口

/**
* Start client and service RPC servers.
*/
void start() {
clientRpcServer.start();
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
}


看下 clientRpcServer 构造

this.clientRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService).setBindAddress(bindHost)
.setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager()).build();


build 方法

public Server build() throws IOException, HadoopIllegalArgumentException {
......

return getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance, this.bindAddress, this.port,
this.numHandlers, this.numReaders, this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager, this.portRangeConfig);
}


getServer 方法

返回 ProtobufRpcEngine.Server 的对象

public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
String bindAddress, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
return new Server(protocol, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig);
}


serviceRpcServer构造同clientRpcServer

clientRpcServer.start() 方法的实现

/** Starts the service.  Must be called before any calls will be handled. */
public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];

for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}


BlockingService

补充一点

ClientNamenodeProtocol.proto 文件 编译生成 ClientNamenodeProtocolProtos.java 文件

该文件里有静态类ClientNamenodeProtocol如下(如果一个类要被声明为 static 的,只有一种情况,就是静态内部类。 )

public static abstract class ClientNamenodeProtocol
implements com.google.protobuf.Service {
protected ClientNamenodeProtocol() {}
...
}


ClientNamenodeProtocol 有一个静态方法 newReflectiveBlockingService

public static com.google.protobuf.BlockingService
newReflectiveBlockingService(final BlockingInterface impl) {
return new com.google.protobuf.BlockingService() {
public final com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptorForType() {
return getDescriptor();
}


ClientNamenodeProtocol 实现 了BlockingService 的 callBlockingMethod 方法

public final com.google.protobuf.Message callBlockingMethod(
com.google.protobuf.Descriptors.MethodDescriptor method,
com.google.protobuf.RpcController controller,
com.google.protobuf.Message request)
throws com.google.protobuf.ServiceException {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.callBlockingMethod() given method descriptor for " +
"wrong service type.");
}
switch(method.getIndex()) {
case 0:
return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request);
case 1:
return impl.getServerDefaults(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto)request);
case 2:


相关类的继承关系

public interface ClientNamenodeProtocolPB extends
ClientNamenodeProtocol.BlockingInterface {
}

public class ClientNamenodeProtocolServerSideTranslatorPB implements
ClientNamenodeProtocolPB {
...
}


构造与 hdfs 客户端交互的 clientNNPbService

ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorPB(this);
BlockingService clientNNPbService = ClientNamenodeProtocol.
newReflectiveBlockingService(clientProtocolServerTranslator);


继承关系

ipc.Server 派生 RPC.Server 派生 ProtobufRpcEngine.Server

服务端 Handler 线程处理请求关键代码

final Call call = callQueue.take(); // pop the queue; maybe blocked here
......
//Handler 关键步骤, call 是抽象方法
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);


这里的call 方法 即 RPC.Server call

@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}


注册, RPC_PROTOCOL_BUFFER 和 ProtoBufRpcInvoker 的映射关系

ProtobufRpcEngine类 静态方法

static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
new Server.ProtoBufRpcInvoker());
}


Server类 方法

registerProtocolEngine,重点 关注 rpcKindMap

public static void registerProtocolEngine(RPC.RpcKind rpcKind,
Class<? extends Writable> rpcRequestWrapperClass,
RpcInvoker rpcInvoker) {
RpcKindMapValue  old =
rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
if (old != null) {
rpcKindMap.put(rpcKind, old);
throw new IllegalArgumentException("ReRegistration of rpcKind: " +
rpcKind);
}
LOG.debug("rpcKind=" + rpcKind +
", rpcRequestWrapperClass=" + rpcRequestWrapperClass +
", rpcInvoker=" + rpcInvoker);
}


所以

getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime);

最终调用了

ProtoBufRpcInvoker 类对象 方法 call,如下

public Writable call(RPC.Server server, String protocol,
Writable writableRequest, long receiveTime) throws Exception {
...
result = service.callBlockingMethod(methodDescriptor, null, param);
...
}


总结

针对hdfs 的客户端请求

由NameNodeRpcServer 开启侦听,

request 到达后 hander 线程处理请求,调用 ProtobufRpcEngine.Server.ProtoBufRpcInvoker 对象的call 方法

call方法 获得BlockingService 对象,该对象 最终调用ClientNamenodeProtocolServerSideTranslatorPB 相关接口方法

ClientNamenodeProtocolServerSideTranslatorPB 最终调用 NameNodeRpcServer 相关接口响应客户端的RPC 请求
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: