hadoop 2.6 源码 解读之NameNodeRpcServer启动及request处理
2018-04-12 09:45
435 查看
NameNode
4000
RpcServer 启动监听
clientRpcServer 用于响应 hdfs 客户端的 RPC请求serviceRpcServer 用于响应 DataNode RPC 请求
此外 NameNodeRpcServer 还实现各种协议接口
/** * Start client and service RPC servers. */ void start() { clientRpcServer.start(); if (serviceRpcServer != null) { serviceRpcServer.start(); } }
看下 clientRpcServer 构造
this.clientRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService).setBindAddress(bindHost) .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
build 方法
public Server build() throws IOException, HadoopIllegalArgumentException { ...... return getProtocolEngine(this.protocol, this.conf).getServer( this.protocol, this.instance, this.bindAddress, this.port, this.numHandlers, this.numReaders, this.queueSizePerHandler, this.verbose, this.conf, this.secretManager, this.portRangeConfig); }
getServer 方法
返回 ProtobufRpcEngine.Server 的对象
public RPC.Server getServer(Class<?> protocol, Object protocolImpl, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException { return new Server(protocol, protocolImpl, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, portRangeConfig); }
serviceRpcServer构造同clientRpcServer
clientRpcServer.start() 方法的实现
/** Starts the service. Must be called before any calls will be handled. */ public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }
BlockingService
补充一点ClientNamenodeProtocol.proto 文件 编译生成 ClientNamenodeProtocolProtos.java 文件
该文件里有静态类ClientNamenodeProtocol如下(如果一个类要被声明为 static 的,只有一种情况,就是静态内部类。 )
public static abstract class ClientNamenodeProtocol implements com.google.protobuf.Service { protected ClientNamenodeProtocol() {} ... }
ClientNamenodeProtocol 有一个静态方法 newReflectiveBlockingService
public static com.google.protobuf.BlockingService newReflectiveBlockingService(final BlockingInterface impl) { return new com.google.protobuf.BlockingService() { public final com.google.protobuf.Descriptors.ServiceDescriptor getDescriptorForType() { return getDescriptor(); }
ClientNamenodeProtocol 实现 了BlockingService 的 callBlockingMethod 方法
public final com.google.protobuf.Message callBlockingMethod( com.google.protobuf.Descriptors.MethodDescriptor method, com.google.protobuf.RpcController controller, com.google.protobuf.Message request) throws com.google.protobuf.ServiceException { if (method.getService() != getDescriptor()) { throw new java.lang.IllegalArgumentException( "Service.callBlockingMethod() given method descriptor for " + "wrong service type."); } switch(method.getIndex()) { case 0: return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request); case 1: return impl.getServerDefaults(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto)request); case 2:
相关类的继承关系
public interface ClientNamenodeProtocolPB extends ClientNamenodeProtocol.BlockingInterface { } public class ClientNamenodeProtocolServerSideTranslatorPB implements ClientNamenodeProtocolPB { ... }
构造与 hdfs 客户端交互的 clientNNPbService
ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this); BlockingService clientNNPbService = ClientNamenodeProtocol. newReflectiveBlockingService(clientProtocolServerTranslator);
继承关系
ipc.Server 派生 RPC.Server 派生 ProtobufRpcEngine.Server服务端 Handler 线程处理请求关键代码
final Call call = callQueue.take(); // pop the queue; maybe blocked here ...... //Handler 关键步骤, call 是抽象方法 value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp);
这里的call 方法 即 RPC.Server call
@Override public Writable call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime) throws Exception { return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime); }
注册, RPC_PROTOCOL_BUFFER 和 ProtoBufRpcInvoker 的映射关系
ProtobufRpcEngine类 静态方法
static { // Register the rpcRequest deserializer for WritableRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class, new Server.ProtoBufRpcInvoker()); }
Server类 方法
registerProtocolEngine,重点 关注 rpcKindMap
public static void registerProtocolEngine(RPC.RpcKind rpcKind, Class<? extends Writable> rpcRequestWrapperClass, RpcInvoker rpcInvoker) { RpcKindMapValue old = rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker)); if (old != null) { rpcKindMap.put(rpcKind, old); throw new IllegalArgumentException("ReRegistration of rpcKind: " + rpcKind); } LOG.debug("rpcKind=" + rpcKind + ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + ", rpcInvoker=" + rpcInvoker); }
所以
getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime);
最终调用了
ProtoBufRpcInvoker 类对象 方法 call,如下
public Writable call(RPC.Server server, String protocol, Writable writableRequest, long receiveTime) throws Exception { ... result = service.callBlockingMethod(methodDescriptor, null, param); ... }
总结
针对hdfs 的客户端请求由NameNodeRpcServer 开启侦听,
request 到达后 hander 线程处理请求,调用 ProtobufRpcEngine.Server.ProtoBufRpcInvoker 对象的call 方法
call方法 获得BlockingService 对象,该对象 最终调用ClientNamenodeProtocolServerSideTranslatorPB 相关接口方法
ClientNamenodeProtocolServerSideTranslatorPB 最终调用 NameNodeRpcServer 相关接口响应客户端的RPC 请求
相关文章推荐
- Hadoop-2.4.1源码分析--HDFS HeartBeat(心跳检测)之NameNode端处理数据块增量汇报
- hadoop 2.6 源码解读之RPC Server 类高性能设计
- Hadoop源码分析24 JobTracker启动和心跳处理流程
- flask之源码解读session处理流程
- Alamofire源码解读系列(三)之通知处理(Notification)
- .net core 源码解析-web app是如何启动并接收处理请求(二) kestrel的启动
- 研磨Hadoop源码(五)ResourceManager启动分析1
- Hadoop源码分析之二(RPC机制之Call处理)
- [Hadoop源码解读](四)MapReduce篇之Counter相关类
- Hadoop源码解析之ApplicationMaster启动流程
- Hadoop启动时报错:Incorrect configuration: namenode address dfs.namenode.servicerpc-address or...
- hadoop-2.2.0 NameNode启动源码注释
- 启动hadoop 2.6遇到的datanode启动不了
- hadoop启动和运行中的error总结和处理方法
- Tomcat源码分析(四)------ Request和Response处理的全过程
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- hadoop配置 - 启动backupNode和SecondaryNamenode
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- Spark源码解读(1)——Master启动过程
- [Hadoop源码解读](六)MapReduce篇之MapTask类