HBase1.2.3版本HRegion的Spilt操作触发机制
2018-01-24 08:55
267 查看
第一节:触发时机
一、如果某个HRegion下的某个HStore下的所有的HFile的大小超过了这个默认配置, 那么这个HRegion会被拆分
二、 当某个HS上的HRegion个数达到这个数量时,不进行拆分
三、在hfile合并的时候,如果合并后的store的大小超过了
hbase.hregion.max.filesize的配置就会进行拆分
四、flush的时候,如果这个HRegin有太多的Hfile且没有到达阻塞时间,那么先拆分
五、flush完之后,检查是否需要进行拆分
六、hbase shell手动触发
第二节:如何拆分
Hbase的region拆分很复杂,抓主干,主要分为几步1、如果满足拆分条件的化,首先根据拆分点初始化2个新的HRegion,其中HRegionA的startKey和endKey分别是原HRegion的startKey,endKey为拆分点。HRegionB的startKey为拆分点,endKey为原HRegion的endkey。
2、开始执行region的拆分,拆分分几步,首先创造子region,其次是上线子region并更新zk的状态,下线原region,最后完成region的拆分。大概流程这样,细枝末节如果感兴趣可以深入去看源码。
如何找到拆分点:
Hbase的合并拆分都是由一个单独的线程池来处理任务,在类 CompactSplitThread里面
public synchronized boolean requestSplit(final Region r) { // don't split regions that are blocking /**判断是否应该拆分 */ if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) { /**找到拆分点,拆分策略有很多种,当前版本默认是IncreasingToUpperBoundRegionSplitPolicy, 可以简单理解为一个HRegion里面最大的Hstore对应的最大Hfile的中间位置 */ byte[] midKey = ((HRegion)r).checkSplit(); if (midKey != null) { /** 请求进行拆分*/ requestSplit(r, midKey); return true; } } return false; }
调用:
public synchronized void requestSplit(final Region r, byte[] midKey, User user) { /**如果没有找到拆分点,记录日志后返回 */ if (midKey == null) { LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + " not splittable because midkey=null"); if (((HRegion)r).shouldForceSplit()) { ((HRegion)r).clearSplit(); } return; } /**构建一个拆分请求任务,交给线程池处理 */ try { this.splits.execute(new SplitRequest(r, midKey, this.server, user)); if (LOG.isDebugEnabled()) { LOG.debug("Split requested for " + r + ". " + this); } } catch (RejectedExecutionException ree) { LOG.info("Could not execute split for " + r, ree); } }
构建拆分请求SplitRequest 的run方法调用了
@Override public void run() { if (this.server.isStopping() || this.server.isStopped()) { LOG.debug("Skipping split because server is stopping=" + this.server.isStopping() + " or stopped=" + this.server.isStopped()); return; } doSplitting(user); }
接着看doSplitting方法:
private void doSplitting(User user) { boolean success = false; server.metricsRegionServer.incrSplitRequest(); long startTime = EnvironmentEdgeManager.currentTime(); SplitTransactionImpl st = new SplitTransactionImpl(parent, midKey); try { //acquire a shared read lock on the table, so that table schema modifications //do not happen concurrently /**加锁 */ tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getTableName() , "SPLIT_REGION:" + parent.getRegionInfo().getRegionNameAsString()); try { tableLock.acquire(); } catch (IOException ex) { tableLock = null; throw ex; } // If prepare does not return true, for some reason -- logged inside in // the prepare call -- we are not ready to split just now. Just return. /**准备阶段,相当于上面说的第一步,先new 2个新的HRegion出来, */ if (!st.prepare()) return; try { /**真正执行拆分 */ st.execute(this.server, this.server, user); success = true; } catch (Exception e) { .......省略.......... } } catch (IOException ex) { LOG.error("Split failed " + this, RemoteExceptionHandler.checkIOException(ex)); server.checkFileSystem(); } finally { .......省略........... // Always log the split transaction journal LOG.info("Split transaction journal:\n\t" + StringUtils.join("\n\t", st.getJournal())); } }
prepare阶段:关键的代码是:构建2个子region
this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
execute阶段入口方法
@Override public PairOfSameType<Region> execute(final Server server, final RegionServerServices services, User user) throws IOException this.server = server; this.rsServices = services; useZKForAssignment = server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration()); if (useCoordinatedStateManager(server)) { std = ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) .getSplitTransactionCoordination().getDefaultDetails(); } /**第一个关键点 */ PairOfSameType<Region> regions = createDaughters(server, services, user); if (this.parent.getCoprocessorHost() != null) { if (user == null) { parent.getCoprocessorHost().preSplitAfterPONR(); } else { try { user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { parent.getCoprocessorHost().preSplitAfterPONR(); return null; } }); } catch (InterruptedException ie) { InterruptedIOException iioe = new InterruptedIOException(); iioe.initCause(ie); throw iioe; } } } /**第二个关键点 */ regions = stepsAfterPONR(server, services, regions, user); /**第三个关键点,内存中记录该HRegion的拆分操作已经完成 */ transition(SplitTransactionPhase.COMPLETED); return regions; }
第一个关键点createDaughters点进去发现是调了
PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
再点进去看stepsBeforePONR
public PairOfSameType<Region> stepsBeforePONR(final Server server, final RegionServerServices services, boolean testing) throws IOException { if (useCoordinatedStateManager(server)) { if (std == null) { std = ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) .getSplitTransactionCoordination().getDefaultDetails(); } /**开启拆分事物,在这个方面里面会首先标记当前待拆分HRegion为Spilting状态,然后通知master监听 */ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(), hri_a, hri_b); } else if (services != null && !useZKForAssignment) { if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent.getRegionInfo(), hri_a, hri_b)) { throw new IOException("Failed to get ok from master to split " + parent.getRegionInfo().getRegionNameAsString()); } } /**内存中使用状态机,记录拆分过程中的步骤,保证事物,如果失败了,可根据内存中记录的步骤进行回滚 */ transition(SplitTransactionPhase.SET_SPLITTING); if (useCoordinatedStateManager(server)) { ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a, hri_b, std); } /**在parent region的目录下创建了一个.split目录,接着在.split目录下创建daughter region A和region B两个子目录, */ this.parent.getRegionFileSystem().createSplitsDir(); transition(SplitTransactionPhase.CREATE_SPLIT_DIR); Map<byte[], List<StoreFile>> hstoreFilesToSplit = null; Exception exceptionToThrow = null; try{ /**将父region close调,close的时候强制刷新memstore,因为要保证拆分时这个HRegion里的所有memstore数据刷到磁盘, 关闭region大概的逻辑是遍历这个region里面的所有hstore,将其交给线程池处理,调用其close方法 */ hstoreFilesToSplit = this.parent.close(false); } catch (Exception e) { exceptionToThrow = e; } ........省略............. // TODO: If splitStoreFiles were multithreaded would we complete steps in // less elapsed time? St.Ack 20100920 // // splitStoreFiles creates daughter region dirs under the parent splits dir // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will // clean this up. /**为parent中的每个storeFile创建了两个索引文件,创建的目的是:HDFS和META中还会保留着指向parent region索引文件的信息,这些索引文件会在子region执行major compact重写的时候被删除掉*/ Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit); // Log to the journal that we are creating region A, the first daughter // region. We could fail halfway through. If we do, we could have left // stuff in fs that needs cleanup -- a storefile or two. Thats why we // add entry to journal BEFORE rather than AFTER the change. transition(SplitTransactionPhase.STARTED_REGION_A_CREATION); /**根据子region的元信息创建了HRegion A和B,实际上就是创建了A/B的实际存储目录 */ assertReferenceFileCount(expectedReferences.getFirst(), this.parent.getRegionFileSystem().getSplitsDir(this.hri_a)); /**创建子HRegionA的存储目录 */ Region a = this.parent.createDaughterRegionFromSplits(this.hri_a); assertReferenceFileCount(expectedReferences.getFirst(), new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName())); // Ditto transition(SplitTransactionPhase.STARTED_REGION_B_CREATION); assertReferenceFileCount(expectedReferences.getSecond(), this.parent.getRegionFileSystem().getSplitsDir(this.hri_b)); /**创建子HRegionB的存储目录 */ Region b = this.parent.createDaughterRegionFromSplits(this.hri_b); assertReferenceFileCount(expectedReferences.getSecond(), new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName())); return new PairOfSameType<Region>(a, b); }
startSplitTransaction
@Override public void startSplitTransaction(HRegion parent, ServerName serverName, HRegionInfo hri_a, HRegionInfo hri_b) throws IOException { HRegionInfo region = parent.getRegionInfo(); try { LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName() + " in PENDING_SPLIT state")); byte[] payload = HRegionInfo.toDelimitedByteArray(hri_a, hri_b); RegionTransition rt = RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload); String node = ZKAssign.getNodeName(watcher, region.getEncodedName()); /** 创建一个临时节点标记该HRegion为spilting状态,并通知master监听该节点*/ if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) { throw new IOException("Failed create of ephemeral " + node); } } catch (KeeperException e) { throw new IOException("Failed creating PENDING_SPLIT znode on " + parent.getRegionInfo().getRegionNameAsString(), e); } }
第二个关键点
public PairOfSameType<Region> stepsAfterPONR(final Server server, final RegionServerServices services, final PairOfSameType<Region> regions, User user) throws IOException { /**将HRegionA 和 HRegionB 打开,打开后,才能接收请求,将新的HRegion加入在线列表里面,同时更新zk节点,更新hbase:meta元数据信息,让请求可以找到 新的HRegion,最后标识spilte完成 */ openDaughters(server, services, regions.getFirst(), regions.getSecond()); if (useCoordinatedStateManager(server)) { ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(), regions.getSecond(), std, parent); } transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK); // Coprocessor callback if (parent.getCoprocessorHost() != null) { if (user == null) { this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond()); } else { try { user.getUGI().doAs(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond()); return null; } }); } catch (InterruptedException ie) { InterruptedIOException iioe = new InterruptedIOException(); iioe.initCause(ie); throw iioe; } } } transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK); return regions; }
整个HRegion的拆分流程可参加官网给出的图
真正拆分的过程细枝末节非常复杂,我没有细看,大概的主干就是:
1、初始化2个子HRegion
2、parent region的目录下创建了一个.split目录,接着在.split目录下创建region A和region B两个子目录
3、下线父HRegion(HRegion关闭需要加写锁,且关闭前flush,compaction操作都必须等待其完成)
4、打开子HRegion,接收请求
5、上线新HRegion,更新zk节点,更新hbase:meta元数据信息表
6、内存中标识spilt操作完成,采用状态机来记录各个操作步骤,失败的时候回滚
相关文章推荐
- HBase1.2.3版本memstore flush触发机制以及HRegionServer级别触发源码分析
- HBase1.2.3版本客户端请求如何定位HRegion的位置
- HBase-1.2.3版本常用优化策略
- HBase1.2.3版本表属性介绍
- HBase1.2.3版本常用配置参数说明
- Nodejs通过Thrift操作hbase卡住原因分析及与javascript的垃圾回收机制的关系
- HBase1.2.3版本存储结构分析
- HBase1.2.3架构剖析(七)之 MOB
- 【Redis缓存机制】6.Set集合类型操作使用
- Hbase的安装及配置、eclipse 操作示例
- hbase的常用操作
- Linux下的MySQL简单操作(服务启动与关闭、启动与关闭、查看版本)
- JSP数据库操作例程 - 存储过程 - JDBC-ODBC - SQL Server - 1.1版本
- Android:onNewIntent()触发机制及注意事项
- linux 下通过过 hbase 的Java api 操作hbase
- Scala操作hbase 最详细的代码解析
- Java操作HBase
- Java API 操作Hbase
- 使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作
- HBase性能优化方法总结(三):读表操作