HBase1.0.0源码分析之Client启动连接流程
2015-04-01 10:39
597 查看
我们知道在使用HBase的过程中首要的是和服务器端取得链接,那么客户端是如何去链接的,它是怎么找到master和regionserver的? 参与该过程中的主要组件又有哪些?这些组件之间是如何协同工作的呢? 今天就让我们来一起解析.
HBase的连接代码很简单,如下:
这里使用了反射技术进行类对象的构造,从代码中我们看到实际是调用了HConncetionImplementation的构造函数,这些类之间的相互关系如下图所示:
从途中可以看出,HConnectionImplementation是实际的Connction实现类,接下来我们去看看该类的实例化过程:
好吧这看起来有点小复杂,它首先调用了另一个构造类
stats = ServerStatisticTracker.create(conf);创建跟踪该connection所相关的region 信息监控实例
this.asyncProcess = createAsyncProcess(this.conf);创建一个同步进程实例,该进程主要负责持续的请求流
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();//远程服务器出现故障时,进行处理的机制
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);//RpcRetryingCaller创建工厂
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);//这个实际没有具体的的类实现
到此结束了下面的那个构造函数,接下来我们回过头来看看上面的构造函数的剩余部分:
同样的我们也就只是分析一些关键步骤:
this.registry = setupRegistry();//用于获取集群的基本信息例如clusterid以及region location的meta数据
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); //负责IPC调用相关
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);//
至此客户端的启动结束了,这里其实主要是启动两个服务,
一个是用于request处理的AsyncProcess
一个是用于获取服务器信息的Registry
还有就是负责RPC调用的RpcClient,相关主要类图如下:
HBase的连接代码很简单,如下:
try (Connection connection = ConnectionFactory.createConnection(conf))这里用到了工厂模式进行Connection实例的创建,需要传入的是配置参数管理类Configuration,在创建中首先需要把用户信息添加进去:
if (user == null) { UserProvider provider = UserProvider.instantiate(conf); user = provider.getCurrent(); } return createConnection(conf, false, pool, user);
String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL, ConnectionManager.HConnectionImplementation.class.getName()); Class<?> clazz = null; try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { throw new IOException(e); } try { // Default HCM#HCI is not accessible; make it so before invoking. Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class, User.class); constructor.setAccessible(true); return (Connection) constructor.newInstance(conf, managed, pool, user);
这里使用了反射技术进行类对象的构造,从代码中我们看到实际是调用了HConncetionImplementation的构造函数,这些类之间的相互关系如下图所示:
从途中可以看出,HConnectionImplementation是实际的Connction实现类,接下来我们去看看该类的实例化过程:
HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool, User user) throws IOException { this(conf); this.user = user; this.batchPool = pool; this.managed = managed; this.registry = setupRegistry(); retrieveClusterId(); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); // Do we publish the status? boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); if (shouldListen) { if (listenerClass == null) { LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); } else { clusterStatusListener = new ClusterStatusListener( new ClusterStatusListener.DeadServerHandler() { @Override public void newDead(ServerName sn) { clearCaches(sn); rpcClient.cancelConnections(sn); } }, conf, listenerClass); } } }
好吧这看起来有点小复杂,它首先调用了另一个构造类
protected HConnectionImplementation(Configuration conf) { this.conf = conf; this.tableConfig = new TableConfiguration(conf); this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numTries = tableConfig.getRetriesNumber(); this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { synchronized (nonceGeneratorCreateLock) { if (ConnectionManager.nonceGenerator == null) { ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator(); } this.nonceGenerator = ConnectionManager.nonceGenerator; } } else { this.nonceGenerator = new NoNonceGenerator(); } stats = ServerStatisticTracker.create(conf); this.asyncProcess = createAsyncProcess(this.conf); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); }ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();//每个客户端随机的NonceGEnerator,主要是为了生成clientid
stats = ServerStatisticTracker.create(conf);创建跟踪该connection所相关的region 信息监控实例
this.asyncProcess = createAsyncProcess(this.conf);创建一个同步进程实例,该进程主要负责持续的请求流
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();//远程服务器出现故障时,进行处理的机制
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);//RpcRetryingCaller创建工厂
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);//这个实际没有具体的的类实现
到此结束了下面的那个构造函数,接下来我们回过头来看看上面的构造函数的剩余部分:
同样的我们也就只是分析一些关键步骤:
this.registry = setupRegistry();//用于获取集群的基本信息例如clusterid以及region location的meta数据
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); //负责IPC调用相关
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);//
至此客户端的启动结束了,这里其实主要是启动两个服务,
一个是用于request处理的AsyncProcess
一个是用于获取服务器信息的Registry
还有就是负责RPC调用的RpcClient,相关主要类图如下:
相关文章推荐
- HBase1.0.0源码分析之请求处理流程分析以Put操作为例(一)
- HBase 0.1.0版本源码分析--Master启动流程
- HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二)
- Hbase-0.98.6源码分析--Put写操作Client端流程
- Journey源码分析二:整体启动流程
- 【kubernetes/k8s源码分析】kube-controller-manager 启动流程分析
- hdfs2.6.2源码学习:Day1-NameNode启动流程分析
- Android wifi源码分析(一) Wifi启动流程
- hbase0.98 源码分析-读数据流程
- React Native启动流程集合安卓源码分析
- Spark On YARN启动流程源码分析
- 小伙伴们的ceph源码分析二——monitor启动流程
- Android系统启动流程源码分析
- 结合源码分析HBase相关操作流程
- Hbase 源码分析之 Regionserver上的 Get 全流程
- hdfs2.6.2源码学习:Day2-DataNode启动流程分析
- spark core源码分析4 worker启动流程
- HBase源码分析之Region上Spilt流程
- nginx源码分析—启动流程
- HBase源码分析_Master启动过程