HBase1.2.3版本客户端请求如何定位HRegion的位置
2018-01-24 09:39
393 查看
一、为什么需要定位
hbase是一个主从的master/slave架构,默认使用zk的选举来支持HMaster的高可用实通过监听临时节点,使用类似分布式锁的方法来争抢创建节点后成为新的master。一个HMaster通常会对应多个HRegionServer,而每一个HRegionServer又可以有多个HRegion,需要注意的是,我们的Table数据刚开始的时候只会存在于一个HRegion里面,但是随着表数据量的增大,会发生spilter操作,然后table数据很可能会存在不同HRegionServer的HRgion里面,而每个HRegion存的是某张表的一部分连续的数据。当我们拿着一个tableName,Rowkey去hbase上查数据的时候,它是怎么定位到是哪一个HRegion里存着这个rowkey对应的数据呢,下面我们以单个rowkey的get操作为例来看一下源码实现(其它操作如scan等在找Region部分的源码实现也是一样)。
二、如何定位
先看一段简单的单元测试
@Test public void readFromHbase() throws IOException{ HBaseSource hbaseSource = HBaseSource.getHbaseSource(appName); hbaseSource.openConnection(); Result rs = hbaseSource.searchDataByGet("TABLE", "TTTTT1201701011200222"); for(KeyValue kv : rs.raw()){ System.out.println("row:" + new String(kv.getRow())); System.out.println("qualifier-value:" + new String(kv.getQualifier()) +";" + new String(kv.getValue())); } }
这里面的searchDataByGet返给就是通过tableName,rowkey去查询数据,下面我们就来分析如何根据这2个调节去获取数据
2.1 首先在初始化Table实例的时候,有一个地方需要注意
private void finishSetup() throws IOException { if (connConfiguration == null) { connConfiguration = new ConnectionConfiguration(configuration); } this.operationTimeout = tableName.is 4000 SystemTable() ? connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); } if (this.rpcControllerFactory == null) { this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); } // puts need to track errors globally due to how the APIs currently work. multiAp = this.connection.getAsyncProcess(); this.closed = false; this.locator = new HRegionLocator(tableName, connection); }
构造函数初始化HRegionLocator对象,conection可用理解为一个集群连接器
, row): locateRegion(tableName, row); }
public HRegionLocator(TableName tableName, ClusterConnection connection) { this.connection = connection; this.tableName = tableName; }
2.2 执行get操作
代码里的get会调用Htable里的下面的方法
@Override public Result get(final Get get) throws IOException { return get(get, get.isCheckExistenceOnly()); }
接着调用
private Result get(Get get, final boolean checkExistenceOnly) throws IOException { // if we are changing settings to the get, clone it. if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { get = ReflectionUtils.newInstance(get.getClass(), get); get.setCheckExistenceOnly(checkExistenceOnly); if (get.getConsistency() == null){ get.setConsistency(defaultConsistency); } } //hbase默认采用强一致性模式 if (get.getConsistency() == Consistency.STRONG) { // Good old call. // 回调函数需要final类型的 final Get getReq = get; //构造一个RegionServerCallable<Result>实例,其会默认调用他的prepare方法, RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, getName(), get.getRow()) { @Override public Result call(int callTimeout) throws IOException { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq); PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); try { ClientProtos.GetResponse response = getStub().get(controller, request); if (response == null) return null; return ProtobufUtil.toResult(response.getResult()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } } }; return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable, this.operationTimeout); } // Call that takes into account the replica RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( rpcControllerFactory, tableName, this.connection, get, pool, connConfiguration.getRetriesNumber(), operationTimeout, connConfiguration.getPrimaryCallTimeoutMicroSecond()); return callable.call(); }
初始化好连接,表名,row
public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { this.connection = connection; this.tableName = tableName; this.row = row; }
/** * Prepare for connection to the server hosting region with row from tablename. Does lookup * to find region location and hosting server. * @param reload Set this to true if connection should re-find the region * @throws IOException e */ @Override public void prepare(final boolean reload) throws IOException { //首先根据表名获取一个 try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { //根据表名,row,是否使用缓存来获取相应的hregion位置 this.location = regionLocator.getRegionLocation(row, reload); } if (this.location == null) { throw new IOException("Failed to find location, tableName=" + tableName + ", row=" + Bytes.toString(row) + ", reload=" + reload); } //获取了位置信息后,构建一个rpc连接准备获取数据 setStub(getConnection().getClient(this.location.getServerName())); }
上面的getRegionLocator实际是实例化了一个RegionLocator
@Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { return new HRegionLocator(tableName, this); }
终于进入主题,看如何找到对应的region
@Override public HRegionLocation getRegionLocation(final byte [] row, boolean reload) throws IOException { //tableName是已经在前面的操作中初始化好了的 return connection.getRegionLocation(tableName, row, reload); }
@Override public HRegionLocation getRegionLocation(final TableName tableName, final byte [] row, boolean reload) throws IOException { //reload true表示不使用缓存,false表示使用缓存 return reload? relocateRegion(tableName, row): locateRegion(tableName, row); }
我们看使用缓存的情况,即调用
@Override public HRegionLocation locateRegion( final TableName tableName, final byte[] row) throws IOException{ RegionLocations locations = locateRegion(tableName, row, true, true); return locations == null ? null : locations.getRegionLocation(); }
这里说一下放的RegionLocations类有个成员变量,维护的是数组index-HRegionLocation的映射关系,这里传入的index为0。
private final HRegionLocation[] locations; // replicaId -> HRegionLocation.
而HRegionLocation就是真正放表,rowkey到region的地方
public class HRegionLocation implements Comparable<HRegionLocation> { private final HRegionInfo regionInfo; 当前row对应的region信息 private final ServerName serverName; 服务名 private final long seqNum; rowkey对应的位置编号 。。。。。。。。。省略 }
继续看调用方法
@Override public RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry) throws IOException { return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); }
继续调用
@Override public RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry, int replicaId) throws IOException { if (this.closed) throw new IOException(toString() + " closed"); if (tableName== null || tableName.getName().length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); } //如果请求表就是hbase:meta表 if (tableName.equals(TableName.META_TABLE_NAME)) { return locateMeta(tableName, useCache, replicaId); } else { //如果不是meta表,cache中没有,需要访问meta RS,调用locateRegionInMeta()方法进行定位 // Region not in the cache - have to go to the meta RS return locateRegionInMeta(tableName, row, useCache, retry, replicaId); } }
定位region
private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { // If we are supposed to be using the cache, look in the cache to see if // we already have the region. //如果我们使用缓存,先充缓存中找,后面我们再看是怎么在缓存里面获取的 if (useCache) { RegionLocations locations = getCachedLocation(tableName, row); if (locations != null && locations.getRegionLocation(replicaId) != null) { return locations; } } //如果缓存中没有,自己构建一个metakey, // build the key of the meta region we should be looking for. // the extra 9's on the end are necessary to allow "exact" matches // without knowing the precise region names. byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false); //构建一个Scan对象,可以看出get操作最终也会转为scan操作 Scan s = new Scan(); s.setReversed(true); //scan操作的开始行即为刚才构建的Metakey s.setStartRow(metaKey); s.setSmall(true); s.setCaching(1); if (this.useMetaReplicas) { s.setConsistency(Consistency.TIMELINE); } //获取重试次数,默认35次,感觉是不是太多了 int localNumRetries = (retry ? numTries : 1){ for (int tries = 0; true; tries++) { if (tries >= localNumRetries) { throw new NoServerForRegionException("Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName + " after " + localNumRetries + " tries."); } //在重试的过程中,再一次检查缓存是否有数据,因为很可能在重试的时候其他线程往缓存写了 if (useCache) { RegionLocations locations = getCachedLocation(tableName, row); if (locations != null && locations.getRegionLocation(replicaId) != null) { return locations; } } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. //如果不使用缓存,在重试的过程中,清空缓存数据,保证再重试的过程中不干扰查询请求 metaCache.clearCache(tableName, row); } // Query the meta region try { Result regionInfoRow = null; ReversedClientScanner rcs = null; try { //构造一个ReversedClientScanner 实例,这里方法调整层次很深,实际上它还是会去查hbase:meta表获取表,rowkey和region对应的映射关系, //看构造函数的第三个参数 rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0); //获取该实例的下一行 regionInfoRow = rcs.next(); } finally { if (rcs != null) { rcs.close(); } } if (regionInfoRow == null) { throw new TableNotFoundException(tableName); } //将我们查询返回的Result转为RegionLocations // convert the row result into the HRegionLocation we need! RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow); if (locations == null || locations.getRegionLocation(replicaId) == null) { throw new IOException("HRegionInfo was null in " + tableName + ", row=" + regionInfoRow); } HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo(); //如果根据数组index没有找到对应的region对应信息 if (regionInfo == null) { throw new IOException("HRegionInfo was null or empty in " + TableName.META_TABLE_NAME + ", row=" + regionInfoRow); } //如果找到表和请求表不一样 // possible we got a region of a different table... if (!regionInfo.getTable().equals(tableName)) { throw new TableNotFoundException( "Table '" + tableName + "' was not found, got: " + regionInfo.getTable() + "."); } //如果找到的region正在进行拆分操作 if (regionInfo.isSplit()) { throw new RegionOfflineException("the only available region for" + " the required row is a split parent," + " the daughters should be online soon: " + regionInfo.getRegionNameAsString()); } //如果找到region已经下线 if (regionInfo.isOffline()) { throw new RegionOfflineException("the region is offline, could" + " be caused by a disable table call: " + regionInfo.getRegionNameAsString()); } ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); if (serverName == null) { throw new NoServerForRegionException("No server address listed " + "in " + TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() + " containing row " + Bytes.toStringBinary(row)); } 如果找到HRegionServer已经挂了 if (isDeadServer(serverName)){ throw new RegionServerStoppedException("hbase:meta says the region "+ regionInfo.getRegionNameAsString()+" is managed by the server " + serverName + ", but it is dead."); } //上面的检查都OK的情况,缓存tableName,rowkey到hregion的关系 // Instantiate the location cacheLocation(tableName, locations); return locations; } catch (TableNotFoundException e) { // if we got this error, probably means the table just plain doesn't // exist. rethrow the error immediately. this should always be coming // from the HTable constructor. throw e; } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); if (e instanceof RemoteException) { e = ((RemoteException)e).unwrapRemoteException(); } if (tries < localNumRetries - 1) { if (LOG.isDebugEnabled()) { LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME + ", metaLocation=" + ", attempt=" + tries + " of " + localNumRetries + " failed; retrying after sleep of " + ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); } } else { throw e; } // Only relocate the parent region if necessary if(!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId); } } try{ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { throw new InterruptedIOException("Giving up trying to location region in " + "meta: thread is interrupted."); } } }
至此,一个tableName、rowke
a3a3
y是如何定位到存储他的region的分析大体已经ok,这里面还有很多分支没有涉及,比如从缓存取和存,还有如果tablename就是Meta表的时候是怎么处理的,这里我总结下
1、如果tableName是非meta表,其最终还是会去查Meta表获取非meta表和region相关的映射关系,找到region的位置
2、位置找到后,构建一个rpc请求对应的region获取数据
3、缓存tablename,rowkey和region位置的对应关系,下次来的时候直接从缓存里面取
4、如果发生了HRegionServer宕机,客户端缓存的地址将不可用的时候,会再次到zk上进行寻址,然后缓存到客户端。
相关文章推荐
- 如何定位Release 版本中程序崩溃的位置 ---利用map文件 拦截windows崩溃函数
- 在ICE客户端中如何定位服务器端的位置(即如何寻找代理)
- 如何利用 release 版本的 backtrace 来定位 android NDK 程序的崩溃位置
- 如何定位Release 版本中程序崩溃的位置 ---利用map文件 拦截windows崩溃函数
- HBase1.2.3版本HRegion的Spilt操作触发机制
- 在ICE客户端中如何定位服务器端的位置(即如何寻找代理)
- 如何定位Release 版本中程序崩溃的位置 ---利用map文件 拦截windows崩溃函数
- 如何定位Release 版本中程序崩溃的位置 ---利用map文件 拦截windows崩溃函数
- 在客户端中如何定位服务器(即如何寻找代理)
- 如何定位导致Crash的代码位置
- php如何实现不借助IDE快速定位行数或者方法定义的文件和位置
- 如何做一个用于测试SSL版本的客户端
- 如何定位到append的当前位置,不用拉滚动条scrollIntoView方法
- asp.net TreeView安装、使用(如何将TreeView打包发布)(带CheckBox选择框的TreeView的初始化,TreeView客户端操作:选择父节点后自动选择所有子节点,子节点选择后自动选择父节点)(TreeView节点精确定位)
- HBase学习之七: 如何定位一条记录所属region,如何查看一个region的数据量,如何查看一个Cell的所有版本
- 初识安卓之服务:客户端绑定服务和服务端如何返回客户请求的结果
- ASP.NET Web Api 服务器端变了,客户端该如何修改请求(转载)
- SAP HANA Studio客户端版本如何更新
- CSS中position的absolute如何相对于父元素的位置进行定位
- XueTr查找到未知内核模块如何定位这个模块的位置