您的位置:首页 > 其它

HBase1.2.3版本客户端请求如何定位HRegion的位置

2018-01-24 09:39 393 查看

一、为什么需要定位

      hbase是一个主从的master/slave架构,默认使用zk的选举来支持HMaster的高可用实通过监听临时节点,使用类似分布式锁的方法来争抢创建节点后成为新的master。

      一个HMaster通常会对应多个HRegionServer,而每一个HRegionServer又可以有多个HRegion,需要注意的是,我们的Table数据刚开始的时候只会存在于一个HRegion里面,但是随着表数据量的增大,会发生spilter操作,然后table数据很可能会存在不同HRegionServer的HRgion里面,而每个HRegion存的是某张表的一部分连续的数据。当我们拿着一个tableName,Rowkey去hbase上查数据的时候,它是怎么定位到是哪一个HRegion里存着这个rowkey对应的数据呢,下面我们以单个rowkey的get操作为例来看一下源码实现(其它操作如scan等在找Region部分的源码实现也是一样)。

二、如何定位

 

     先看一段简单的单元测试

@Test
public void readFromHbase() throws IOException{
HBaseSource hbaseSource =  HBaseSource.getHbaseSource(appName);
hbaseSource.openConnection();
Result rs = hbaseSource.searchDataByGet("TABLE", "TTTTT1201701011200222");
for(KeyValue kv : rs.raw()){
System.out.println("row:" + new String(kv.getRow()));
System.out.println("qualifier-value:" +  new String(kv.getQualifier()) +";" +  new String(kv.getValue()));
}
}

     这里面的searchDataByGet返给就是通过tableName,rowkey去查询数据,下面我们就来分析如何根据这2个调节去获取数据

     2.1  首先在初始化Table实例的时候,有一个地方需要注意

   

private void finishSetup() throws IOException {
if (connConfiguration == null) {
connConfiguration = new ConnectionConfiguration(configuration);
}
this.operationTimeout = tableName.is
4000
SystemTable() ?
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
if (this.rpcCallerFactory == null) {
this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
}
if (this.rpcControllerFactory == null) {
this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
}
// puts need to track errors globally due to how the APIs currently work.
multiAp = this.connection.getAsyncProcess();
this.closed = false;
this.locator = new HRegionLocator(tableName, connection);
}


     构造函数初始化HRegionLocator对象,conection可用理解为一个集群连接器
, row): locateRegion(tableName, row); }      

public HRegionLocator(TableName tableName, ClusterConnection connection) {
this.connection = connection;
this.tableName = tableName;
}


    2.2  执行get操作

       代码里的get会调用Htable里的下面的方法

  

@Override
public Result get(final Get get) throws IOException {
return get(get, get.isCheckExistenceOnly());
}


    接着调用
   

private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
// if we are changing settings to the get, clone it.
if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
get = ReflectionUtils.newInstance(get.getClass(), get);
get.setCheckExistenceOnly(checkExistenceOnly);
if (get.getConsistency() == null){
get.setConsistency(defaultConsistency);
}
}
//hbase默认采用强一致性模式
if (get.getConsistency() == Consistency.STRONG) {
// Good old call.
// 回调函数需要final类型的
final Get getReq = get;
//构造一个RegionServerCallable<Result>实例,其会默认调用他的prepare方法,
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getName(), get.getRow()) {
@Override
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) return null;
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
// Call that takes into account the replica
RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
rpcControllerFactory, tableName, this.connection, get, pool,
connConfiguration.getRetriesNumber(),
operationTimeout,
connConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call();
}


      初始化好连接,表名,row

   

public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
this.connection = connection;
this.tableName = tableName;
this.row = row;
}


   
/**
* Prepare for connection to the server hosting region with row from tablename.  Does lookup
* to find region location and hosting server.
* @param reload Set this to true if connection should re-find the region
* @throws IOException e
*/

@Override
public void prepare(final boolean reload) throws IOException {
//首先根据表名获取一个
try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
//根据表名,row,是否使用缓存来获取相应的hregion位置
this.location = regionLocator.getRegionLocation(row, reload);
}
if (this.location == null) {
throw new IOException("Failed to find location, tableName=" + tableName +
", row=" + Bytes.toString(row) + ", reload=" + reload);
}
//获取了位置信息后,构建一个rpc连接准备获取数据
setStub(getConnection().getClient(this.location.getServerName()));
}


    上面的getRegionLocator实际是实例化了一个RegionLocator

    

@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return new HRegionLocator(tableName, this);
}


     终于进入主题,看如何找到对应的region

  

@Override
public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
throws IOException {
//tableName是已经在前面的操作中初始化好了的
return connection.getRegionLocation(tableName, row, reload);
}

   
@Override
public HRegionLocation getRegionLocation(final TableName tableName,
final byte [] row, boolean reload)
throws IOException {
//reload true表示不使用缓存,false表示使用缓存
return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
}


     我们看使用缓存的情况,即调用

  

@Override
public HRegionLocation locateRegion(
final TableName tableName, final byte[] row) throws IOException{
RegionLocations locations = locateRegion(tableName, row, true, true);
return locations == null ? null : locations.getRegionLocation();
}

 

     这里说一下放的RegionLocations类有个成员变量,维护的是数组index-HRegionLocation的映射关系,这里传入的index为0。

 

private final HRegionLocation[] locations; // replicaId -> HRegionLocation.


     而HRegionLocation就是真正放表,rowkey到region的地方

    

public class HRegionLocation implements Comparable<HRegionLocation> {
private final HRegionInfo regionInfo; 当前row对应的region信息
private final ServerName serverName; 服务名
private final long seqNum; rowkey对应的位置编号
。。。。。。。。。省略
}


     继续看调用方法

   

@Override
public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry)
throws IOException {
return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}

      继续调用

    

@Override
public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry, int replicaId)
throws IOException {
if (this.closed) throw new IOException(toString() + " closed");
if (tableName== null || tableName.getName().length == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
}
//如果请求表就是hbase:meta表
if (tableName.equals(TableName.META_TABLE_NAME)) {
return locateMeta(tableName, useCache, replicaId);
} else {
//如果不是meta表,cache中没有,需要访问meta RS,调用locateRegionInMeta()方法进行定位
// Region not in the cache - have to go to the meta RS
return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
}
}


    定位region

 

    

private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
boolean useCache, boolean retry, int replicaId) throws IOException {

// If we are supposed to be using the cache, look in the cache to see if
// we already have the region.
//如果我们使用缓存,先充缓存中找,后面我们再看是怎么在缓存里面获取的

if (useCache) {
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
}

//如果缓存中没有,自己构建一个metakey,

// build the key of the meta region we should be looking for.

// the extra 9's on the end are necessary to allow "exact" matches
// without knowing the precise region names.
byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
//构建一个Scan对象,可以看出get操作最终也会转为scan操作
Scan s = new Scan();
s.setReversed(true);
//scan操作的开始行即为刚才构建的Metakey
s.setStartRow(metaKey);
s.setSmall(true);
s.setCaching(1);
if (this.useMetaReplicas) {
s.setConsistency(Consistency.TIMELINE);
}
//获取重试次数,默认35次,感觉是不是太多了
int localNumRetries = (retry ? numTries : 1){
for (int tries = 0; true; tries++) {
if (tries >= localNumRetries) {
throw new NoServerForRegionException("Unable to find region for "
+ Bytes.toStringBinary(row) + " in " + tableName +
" after " + localNumRetries + " tries.");

}
//在重试的过程中,再一次检查缓存是否有数据,因为很可能在重试的时候其他线程往缓存写了
if (useCache) {
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
} else {
// If we are not supposed to be using the cache, delete any existing cached location
// so it won't interfere.
//如果不使用缓存,在重试的过程中,清空缓存数据,保证再重试的过程中不干扰查询请求
metaCache.clearCache(tableName, row);
}
// Query the meta region
try {
Result regionInfoRow = null;
ReversedClientScanner rcs = null;
try {
//构造一个ReversedClientScanner 实例,这里方法调整层次很深,实际上它还是会去查hbase:meta表获取表,rowkey和region对应的映射关系,
//看构造函数的第三个参数
rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
//获取该实例的下一行
regionInfoRow = rcs.next();
} finally {
if (rcs != null) {
rcs.close();
}
}
if (regionInfoRow == null) {
throw new TableNotFoundException(tableName);
}
//将我们查询返回的Result转为RegionLocations
// convert the row result into the HRegionLocation we need!
RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
if (locations == null || locations.getRegionLocation(replicaId) == null) {
throw new IOException("HRegionInfo was null in " +
tableName + ", row=" + regionInfoRow);
}
HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
//如果根据数组index没有找到对应的region对应信息
if (regionInfo == null) {
throw new IOException("HRegionInfo was null or empty in " +
TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
}

//如果找到表和请求表不一样
// possible we got a region of a different table...
if (!regionInfo.getTable().equals(tableName)) {
throw new TableNotFoundException(
"Table '" + tableName + "' was not found, got: " +
regionInfo.getTable() + ".");
}
//如果找到的region正在进行拆分操作
if (regionInfo.isSplit()) {
throw new RegionOfflineException("the only available region for" +
" the required row is a split parent," +
" the daughters should be online soon: " +
regionInfo.getRegionNameAsString());
}
//如果找到region已经下线
if (regionInfo.isOffline()) {
throw new RegionOfflineException("the region is offline, could" +
" be caused by a disable table call: " +
regionInfo.getRegionNameAsString());
}
ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
if (serverName == null) {
throw new NoServerForRegionException("No server address listed " +
"in " + TableName.META_TABLE_NAME + " for region " +
regionInfo.getRegionNameAsString() + " containing row " +
Bytes.toStringBinary(row));
}
如果找到HRegionServer已经挂了
if (isDeadServer(serverName)){
throw new RegionServerStoppedException("hbase:meta says the region "+
regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
", but it is dead.");
}
//上面的检查都OK的情况,缓存tableName,rowkey到hregion的关系
// Instantiate the location
cacheLocation(tableName, locations);
return locations;
} catch (TableNotFoundException e) {
// if we got this error, probably means the table just plain doesn't
// exist. rethrow the error immediately. this should always be coming
// from the HTable constructor.
throw e;
} catch (IOException e) {
ExceptionUtil.rethrowIfInterrupt(e);
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
}
if (tries < localNumRetries - 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("locateRegionInMeta parentTable=" +
TableName.META_TABLE_NAME + ", metaLocation=" +
", attempt=" + tries + " of " +
localNumRetries + " failed; retrying after sleep of " +
ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
}
} else {
throw e;
}
// Only relocate the parent region if necessary
if(!(e instanceof RegionOfflineException ||
e instanceof NoServerForRegionException)) {
relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
}
}
try{
Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
} catch (InterruptedException e) {
throw new InterruptedIOException("Giving up trying to location region in " +
"meta: thread is interrupted.");
}
}
}


      至此,一个tableName、rowke
a3a3
y是如何定位到存储他的region的分析大体已经ok,这里面还有很多分支没有涉及,比如从缓存取和存,还有如果tablename就是Meta表的时候是怎么处理的,这里我总结下

    1、如果tableName是非meta表,其最终还是会去查Meta表获取非meta表和region相关的映射关系,找到region的位置

    2、位置找到后,构建一个rpc请求对应的region获取数据

    3、缓存tablename,rowkey和region位置的对应关系,下次来的时候直接从缓存里面取

    4、如果发生了HRegionServer宕机,客户端缓存的地址将不可用的时候,会再次到zk上进行寻址,然后缓存到客户端。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐