您的位置:首页 > 其它

HBase1.2.3版本HRegion的Spilt操作触发机制

2018-01-24 08:55 267 查看

第一节:触发时机

      一、如果某个HRegion下的某个HStore下的所有的HFile的大小超过了


      这个默认配置, 那么这个HRegion会被拆分

   二、 当某个HS上的HRegion个数达到这个数量时,不进行拆分

 



  三、在hfile合并的时候,如果合并后的store的大小超过了

    hbase.hregion.max.filesize的配置就会进行拆分



 

   四、flush的时候,如果这个HRegin有太多的Hfile且没有到达阻塞时间,那么先拆分

 



   五、flush完之后,检查是否需要进行拆分



  六、hbase shell手动触发



 

第二节:如何拆分

    Hbase的region拆分很复杂,抓主干,主要分为几步

    1、如果满足拆分条件的化,首先根据拆分点初始化2个新的HRegion,其中HRegionA的startKey和endKey分别是原HRegion的startKey,endKey为拆分点。HRegionB的startKey为拆分点,endKey为原HRegion的endkey。

    2、开始执行region的拆分,拆分分几步,首先创造子region,其次是上线子region并更新zk的状态,下线原region,最后完成region的拆分。大概流程这样,细枝末节如果感兴趣可以深入去看源码。

     如何找到拆分点:

     Hbase的合并拆分都是由一个单独的线程池来处理任务,在类 CompactSplitThread里面

   

public synchronized boolean requestSplit(final Region r) {
// don't split regions that are blocking
/**判断是否应该拆分 */
if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
/**找到拆分点,拆分策略有很多种,当前版本默认是IncreasingToUpperBoundRegionSplitPolicy,
可以简单理解为一个HRegion里面最大的Hstore对应的最大Hfile的中间位置 */
byte[] midKey = ((HRegion)r).checkSplit();
if (midKey != null) {
/** 请求进行拆分*/
requestSplit(r, midKey);
return true;
}
}
return false;
}

     调用:

   

public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
/**如果没有找到拆分点,记录日志后返回 */
if (midKey == null) {
LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
" not splittable because midkey=null");
if (((HRegion)r).shouldForceSplit()) {
((HRegion)r).clearSplit();
}
return;
}
/**构建一个拆分请求任务,交给线程池处理 */
try {
this.splits.execute(new SplitRequest(r, midKey, this.server, user));
if (LOG.isDebugEnabled()) {
LOG.debug("Split requested for " + r + ".  " + this);
}
} catch (RejectedExecutionException ree) {
LOG.info("Could not execute split for " + r, ree);
}
}

     构建拆分请求SplitRequest 的run方法调用了

    

@Override
public void run() {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.debug("Skipping split because server is stopping=" +
this.server.isStopping() + " or stopped=" + this.server.isStopped());
return;
}
doSplitting(user);
}

     接着看doSplitting方法:

   

private void doSplitting(User user) {
boolean success = false;
server.metricsRegionServer.incrSplitRequest();
long startTime = EnvironmentEdgeManager.currentTime();
SplitTransactionImpl st = new SplitTransactionImpl(parent, midKey);
try {
//acquire a shared read lock on the table, so that table schema modifications
//do not happen concurrently
/**加锁 */
tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getTableName()
, "SPLIT_REGION:" + parent.getRegionInfo().getRegionNameAsString());
try {
tableLock.acquire();
} catch (IOException ex) {
tableLock = null;
throw ex;
}
// If prepare does not return true, for some reason -- logged inside in
// the prepare call -- we are not ready to split just now. Just return.
/**准备阶段,相当于上面说的第一步,先new 2个新的HRegion出来, */
if (!st.prepare()) return;
try {
/**真正执行拆分 */
st.execute(this.server, this.server, user);
success = true;
} catch (Exception e) {
.......省略..........
}
} catch (IOException ex) {
LOG.error("Split failed " + this, RemoteExceptionHandler.checkIOException(ex));
server.checkFileSystem();
} finally {
.......省略...........
// Always log the split transaction journal
LOG.info("Split transaction journal:\n\t" + StringUtils.join("\n\t", st.getJournal()));
}
}

     prepare阶段:关键的代码是:构建2个子region

this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
      execute阶段入口方法

    

@Override
public PairOfSameType<Region> execute(final Server server,
final RegionServerServices services, User user) throws IOException
this.server = server;
this.rsServices = services;
useZKForAssignment = server == null ? true :
ConfigUtil.useZKForAssignment(server.getConfiguration());
if (useCoordinatedStateManager(server)) {
std =
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitTransactionCoordination().getDefaultDetails();
}
/**第一个关键点 */
PairOfSameType<Region> regions = createDaughters(server, services, user);
if (this.parent.getCoprocessorHost() != null) {
if (user == null) {
parent.getCoprocessorHost().preSplitAfterPONR();
} else {
try {
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
parent.getCoprocessorHost().preSplitAfterPONR();
return null;
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
/**第二个关键点 */
regions = stepsAfterPONR(server, services, regions, user);
/**第三个关键点,内存中记录该HRegion的拆分操作已经完成 */
transition(SplitTransactionPhase.COMPLETED);
return regions;
}

    第一个关键点createDaughters点进去发现是调了

  

PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);


    再点进去看stepsBeforePONR

   

public PairOfSameType<Region> stepsBeforePONR(final Server server,
final RegionServerServices services, boolean testing) throws IOException {
if (useCoordinatedStateManager(server)) {
if (std == null) {
std =
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitTransactionCoordination().getDefaultDetails();
}
/**开启拆分事物,在这个方面里面会首先标记当前待拆分HRegion为Spilting状态,然后通知master监听 */
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
hri_a, hri_b);
} else if (services != null && !useZKForAssignment) {
if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
parent.getRegionInfo(), hri_a, hri_b)) {
throw new IOException("Failed to get ok from master to split "
+ parent.getRegionInfo().getRegionNameAsString());
}
}
/**内存中使用状态机,记录拆分过程中的步骤,保证事物,如果失败了,可根据内存中记录的步骤进行回滚 */
transition(SplitTransactionPhase.SET_SPLITTING);
if (useCoordinatedStateManager(server)) {
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
hri_b, std);
}
/**在parent region的目录下创建了一个.split目录,接着在.split目录下创建daughter region A和region B两个子目录, */
this.parent.getRegionFileSystem().createSplitsDir();
transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
Exception exceptionToThrow = null;
try{
/**将父region close调,close的时候强制刷新memstore,因为要保证拆分时这个HRegion里的所有memstore数据刷到磁盘,
关闭region大概的逻辑是遍历这个region里面的所有hstore,将其交给线程池处理,调用其close方法 */
hstoreFilesToSplit = this.parent.close(false);
} catch (Exception e) {
exceptionToThrow = e;
}
........省略.............
// TODO: If splitStoreFiles were multithreaded would we complete steps in
// less elapsed time?  St.Ack 20100920
//
// splitStoreFiles creates daughter region dirs under the parent splits dir
// Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
// clean this up.
/**为parent中的每个storeFile创建了两个索引文件,创建的目的是:HDFS和META中还会保留着指向parent region索引文件的信息,这些索引文件会在子region执行major compact重写的时候被删除掉*/
Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
// Log to the journal that we are creating region A, the first daughter
// region.  We could fail halfway through.  If we do, we could have left
// stuff in fs that needs cleanup -- a storefile or two.  Thats why we
// add entry to journal BEFORE rather than AFTER the change.
transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
/**根据子region的元信息创建了HRegion A和B,实际上就是创建了A/B的实际存储目录 */
assertReferenceFileCount(expectedReferences.getFirst(),
this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
/**创建子HRegionA的存储目录 */
Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
assertReferenceFileCount(expectedReferences.getFirst(),
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
// Ditto
transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
assertReferenceFileCount(expectedReferences.getSecond(),
this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
/**创建子HRegionB的存储目录 */
Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
assertReferenceFileCount(expectedReferences.getSecond(),
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
return new PairOfSameType<Region>(a, b);
}


     startSplitTransaction

@Override
public void startSplitTransaction(HRegion parent, ServerName serverName, HRegionInfo hri_a,
HRegionInfo hri_b) throws IOException {
HRegionInfo region = parent.getRegionInfo();
try {
LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
+ " in PENDING_SPLIT state"));
byte[] payload = HRegionInfo.toDelimitedByteArray(hri_a, hri_b);
RegionTransition rt =
RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_SPLIT,
region.getRegionName(), serverName, payload);
String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
/** 创建一个临时节点标记该HRegion为spilting状态,并通知master监听该节点*/
if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
throw new IOException("Failed create of ephemeral " + node);
}
} catch (KeeperException e) {
throw new IOException("Failed creating PENDING_SPLIT znode on "
+ parent.getRegionInfo().getRegionNameAsString(), e);
}

}

      第二个关键点

     

public PairOfSameType<Region> stepsAfterPONR(final Server server,
final RegionServerServices services, final PairOfSameType<Region> regions, User user)
throws IOException {
/**将HRegionA 和 HRegionB 打开,打开后,才能接收请求,将新的HRegion加入在线列表里面,同时更新zk节点,更新hbase:meta元数据信息,让请求可以找到
新的HRegion,最后标识spilte完成 */
openDaughters(server, services, regions.getFirst(), regions.getSecond());
if (useCoordinatedStateManager(server)) {
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
regions.getSecond(), std, parent);
}
transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
// Coprocessor callback
if (parent.getCoprocessorHost() != null) {
if (user == null) {
this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
} else {
try {
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
return null;
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
return regions;
}

      整个HRegion的拆分流程可参加官网给出的图



 

  真正拆分的过程细枝末节非常复杂,我没有细看,大概的主干就是:

    1、初始化2个子HRegion

    2、parent region的目录下创建了一个.split目录,接着在.split目录下创建region A和region B两个子目录

    3、下线父HRegion(HRegion关闭需要加写锁,且关闭前flush,compaction操作都必须等待其完成)

    4、打开子HRegion,接收请求

    5、上线新HRegion,更新zk节点,更新hbase:meta元数据信息表

    6、内存中标识spilt操作完成,采用状态机来记录各个操作步骤,失败的时候回滚
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: