您的位置:首页 > 其它

HBase1.0.0源码分析之Client启动连接流程

2015-04-01 10:39 597 查看
我们知道在使用HBase的过程中首要的是和服务器端取得链接,那么客户端是如何去链接的,它是怎么找到master和regionserver的? 参与该过程中的主要组件又有哪些?这些组件之间是如何协同工作的呢? 今天就让我们来一起解析.

HBase的连接代码很简单,如下:

try (Connection connection = ConnectionFactory.createConnection(conf))
这里用到了工厂模式进行Connection实例的创建,需要传入的是配置参数管理类Configuration,在创建中首先需要把用户信息添加进去:

if (user == null) {
UserProvider provider = UserProvider.instantiate(conf);
user = provider.getCurrent();
}

return createConnection(conf, false, pool, user);


String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionManager.HConnectionImplementation.class.getName());
Class<?> clazz = null;
try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor =
clazz.getDeclaredConstructor(Configuration.class,
boolean.class, ExecutorService.class, User.class);
constructor.setAccessible(true);
return (Connection) constructor.newInstance(conf, managed, pool, user);


这里使用了反射技术进行类对象的构造,从代码中我们看到实际是调用了HConncetionImplementation的构造函数,这些类之间的相互关系如下图所示:



从途中可以看出,HConnectionImplementation是实际的Connction实现类,接下来我们去看看该类的实例化过程:

HConnectionImplementation(Configuration conf, boolean managed,
ExecutorService pool, User user) throws IOException {
this(conf);
this.user = user;
this.batchPool = pool;
this.managed = managed;
this.registry = setupRegistry();
retrieveClusterId();

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);

// Do we publish the status?
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusListener.Listener> listenerClass =
conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
ClusterStatusListener.Listener.class);
if (shouldListen) {
if (listenerClass == null) {
LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
} else {
clusterStatusListener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
}
}
}


好吧这看起来有点小复杂,它首先调用了另一个构造类
protected HConnectionImplementation(Configuration conf) {
this.conf = conf;
this.tableConfig = new TableConfiguration(conf);
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = tableConfig.getRetriesNumber();
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
synchronized (nonceGeneratorCreateLock) {
if (ConnectionManager.nonceGenerator == null) {
ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
}
this.nonceGenerator = ConnectionManager.nonceGenerator;
}
} else {
this.nonceGenerator = new NoNonceGenerator();
}
stats = ServerStatisticTracker.create(conf);
this.asyncProcess = createAsyncProcess(this.conf);
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
}
ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();//每个客户端随机的NonceGEnerator,主要是为了生成clientid

stats = ServerStatisticTracker.create(conf);创建跟踪该connection所相关的region 信息监控实例

this.asyncProcess = createAsyncProcess(this.conf);创建一个同步进程实例,该进程主要负责持续的请求流

this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();//远程服务器出现故障时,进行处理的机制

this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);//RpcRetryingCaller创建工厂

this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);//这个实际没有具体的的类实现

到此结束了下面的那个构造函数,接下来我们回过头来看看上面的构造函数的剩余部分:

同样的我们也就只是分析一些关键步骤:

this.registry = setupRegistry();//用于获取集群的基本信息例如clusterid以及region location的meta数据

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); //负责IPC调用相关

this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);//

至此客户端的启动结束了,这里其实主要是启动两个服务,

一个是用于request处理的AsyncProcess

一个是用于获取服务器信息的Registry

还有就是负责RPC调用的RpcClient,相关主要类图如下:





内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: