您的位置:首页 > 其它

hbase0.96.1 hmaster启动过程分析

2014-03-27 16:00 543 查看

HBASE HMASTER启动分析

Hbase debug配置:

找到hbase这个shell脚本文件,在对应的位置,如我要debugmaster,

elif[ "$COMMAND" = "master" ] ; then

CLASS='org.apache.hadoop.hbase.master.HMaster'
if[ "$1" != "stop" ] && [ "$1" !="clear" ] ; then

HBASE_OPTS="-Xdebug-Xrunjdwp:transport=dt_socket,address=5555,server=y,suspend=y$HBASE_OPTS$HBASE_MASTER_OPTS"

fi
在eclipse中,找到hbase-server这个项目,添加remotedebug配置,可参见hadooop源码分析中的配置

通过shell脚本,hbase-daemon.shstart
master启动调用Hmaster的main方法,

publicstaticvoidmain(String[]
args) {
//打印版本信息,并记录到日志中.
VersionInfo.logVersion();
//生成一个HMasterCommandLine实例,并执行其doMain方法,传入参数
//见HmasterCommandLine.doMain方法
newHMasterCommandLine(HMaster.class).doMain(args);
}

HmasterCommandLine.doMain方法

publicvoid
doMain(Stringargs[]) {
try{
//1.生成Configuration实例,
//-此实例默认需要加载hadoop的core-default.xml/core-site.xml,
//-因此hbase的classpath中应该包含hadoop-home的目录
hbase-env.sh配置示例
DOOP_HOME="/work/dist/hadoop-2.2.0-cdh5.0.0-beta-2"
JAVA_LIBRARY_PATH="$HADOOP_HOME/lib/native:$HADOOP_HOME/lib/native/Linux-amd64-64"
//2.加载hbase-default.xml/hbase-site.xml文件
//3.执行HmasterCommandLine.run方法
intret
=ToolRunner.run(HBaseConfiguration.create(),this,args);
if(ret
!= 0){
System.exit(ret);
}
}catch(Exception
e){
LOG.error("Failedto
run", e);
System.exit(-1);
}
}

HmasterCommandLine.run方法

通过判断是否是start/stop来调用不同的方法进行启动,调用startMaster方法

HmasterCommandLine.startMaster方法

privateintstartMaster(){
Configuration
conf=
getConf();
try{
//If 'local', defer to LocalHBaseCluster instance. Starts master
//and
regionserverboth in the one JVM.
//判断hbase.cluster.distributed是否为false,如果是,启用单机模式(zk/master/regionserver在一个jvm内),
if(LocalHBaseCluster.isLocal(conf)){
finalMiniZooKeeperCluster
zooKeeperCluster=
newMiniZooKeeperCluster(conf);
File
zkDataPath=
newFile(conf.get(HConstants.ZOOKEEPER_DATA_DIR));
intzkClientPort=
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,0);
if(zkClientPort==
0) {
thrownewIOException("Noconfig
value for "
+HConstants.ZOOKEEPER_CLIENT_PORT);
}
zooKeeperCluster.setDefaultClientPort(zkClientPort);

//login the
zookeeperserver principal (if using security)
ZKUtil.loginServer(conf,"hbase.zookeeper.server.keytab.file",
"hbase.zookeeper.server.kerberos.principal",null);

intclientPort=
zooKeeperCluster.startup(zkDataPath);
if(clientPort!=
zkClientPort){
........此处省去一些代码
thrownewIOException(errorMsg);
}
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,
Integer.toString(clientPort));
//Need to have the
zkcluster shutdown when master is shutdown.
//Run a subclass that does the
zkcluster shutdown on its way out.
LocalHBaseCluster
cluster=
newLocalHBaseCluster(conf,conf.getInt("hbase.masters",1),
conf.getInt("hbase.regionservers",1),
LocalHMaster.class,HRegionServer.class);
((LocalHMaster)cluster.getMaster(0)).setZKCluster(zooKeeperCluster);
cluster.startup();
waitOnMasterThreads(cluster);
}
else{
logProcessInfo(getConf());
//生成Hmaster的实例,调用HMaster(finalConfiguration
conf)方法
HMaster
master= HMaster.constructMaster(masterClass,conf);
if(master.isStopped()){
LOG.info("Won'tbring
the Master up as a shutdown is requested");
return1;
}
//启动hmaster,请参见Hmaster.start方法
master.start();
master.join();
if(master.isAborted())
thrownewRuntimeException("HMasterAborted");
}
}catch(Throwable
t){
LOG.error("Masterexiting",
t);
return1;
}
return0;
}

Hmaster构造方法

publicHMaster(finalConfiguration
conf)
throwsIOException, KeeperException,
InterruptedException {
this.conf=
newConfiguration(conf);
//Disable the block cache on the master
//因为这是master,不启用hfile的block-cache,把hfile.block.cache.size设置为0
this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,0.0f);
//检查通过hbase对hdfs进行本地读取时,是否需要检验,dfs.client.read.shortcircuit.skip.checksum,默认为false
//通过hbase.regionserver.checksum.verify来配置regionserver读取到数据后是否检验
FSUtils.setupShortCircuitRead(conf);
//Server to handle client requests.
//通过dns得到当前主机的hostname
String
hostname= Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.master.dns.interface","default"),
conf.get("hbase.master.dns.nameserver","default")));

//得到master的rpc端口hbase.master.port,默认为60000
intport
=conf.getInt(HConstants.MASTER_PORT,HConstants.DEFAULT_MASTER_PORT);
//Test that the
hostnameis reachable
InetSocketAddress
initialIsa=
newInetSocketAddress(hostname,port);
if(initialIsa.getAddress()==
null){
thrownewIllegalArgumentException("Failedresolve
of hostname " +initialIsa);
}
//Verify that the bind address is reachable if set
//如果通过hbase.master.ipc.address指定有ip地址或主机名,通过指定的地址
String
bindAddress=
conf.get("hbase.master.ipc.address");
if(bindAddress!=
null){
initialIsa=
newInetSocketAddress(bindAddress,port);
if(initialIsa.getAddress()==
null){
thrownewIllegalArgumentException("Failedresolve
of bind address " +initialIsa);
}
}
String
name=
"master/"+
initialIsa.toString();
//Set how many times to retry talking to another server overHconnection.
//设置client连接的最大重试次次数,通过hbase.client.retries.number(默认为31)与
      hbase.client.serverside.retries.multiplier(默认为10) 相乘得到(server-to-server)
HConnectionManager.setServerSideHConnectionRetries(this.conf,name,
LOG);
//通过hbase.master.handler.count配置master的线程个数,
   默认为hbase.regionserver.handler.count或25
intnumHandlers=
conf.getInt("hbase.master.handler.count",
conf.getInt("hbase.regionserver.handler.count",25));
this.rpcServer=
newRpcServer(this,name,getServices(),
initialIsa,// BindAddress
is IP we got for thisserver.
numHandlers,
0,
//we dontuse high priority handlers in master
conf,
0);
//this is a DNC w/o high priority handlers
//Set our address.
this.isa=
this.rpcServer.getListenerAddress();
//We don't want to pass isa's
hostnamehere since it could be 0.0.0.0
this.serverName=
ServerName.valueOf(hostname,this.isa.getPort(),System.currentTimeMillis());
this.rsFatals=
newMemoryBoundedLogMessageBuffer(
conf.getLong("hbase.master.buffer.for.rs.fatals",1*1024*1024));

//login the
zookeeperclient principal (if using security)
ZKUtil.loginClient(this.conf,"hbase.zookeeper.client.keytab.file",
"hbase.zookeeper.client.kerberos.principal",this.isa.getHostName());

//initialize server principal (if using secure
Hadoop)
UserProvider
provider= UserProvider.instantiate(conf);
provider.login("hbase.master.keytab.file",
"hbase.master.kerberos.principal",this.isa.getHostName());

........此处省去一些代码

//set the thread name now we have an address
setName(MASTER+
":"+
this.serverName.toShortString());
//检查是否启用hbase.replication的跨集群同步,
//
请参见http://blog.csdn.net/teriy/article/details/7954203
Replication.decorateMasterConfiguration(this.conf);

//Hack! Maps DFSClient => Master for logs. HDFS made this
//configparamfor
task trackers, but we can piggyback off of it.
if(this.conf.get("mapred.task.id")==
null){
this.conf.set("mapred.task.id","hb_m_"+
this.serverName.toString());
}
//生成zk的监听,通过hbase.zookeeper.quorum进行配置,并初始化zk上的节点
this.zooKeeper=
newZooKeeperWatcher(conf,MASTER+
":"+
isa.getPort(),this,true);
this.rpcServer.startThreads();
this.pauseMonitor=
newJvmPauseMonitor(conf);
this.pauseMonitor.start();

//metrics interval: using the same property as region server.
this.msgInterval=
conf.getInt("hbase.regionserver.msginterval",3
* 1000);

//shouldwe check the compression
codectype at master side, default true, HBASE-6370
this.masterCheckCompression=
conf.getBoolean("hbase.master.check.compression",true);

this.metricsMaster=
newMetricsMaster(newMetricsMasterWrapperImpl(this));

//Health checker thread.
//检查是否设置hbase.node.health.script.location,节点健康检查的脚本路径
//通过hbase.node.health.script.frequency设置检查间隔,默认为10000
//通过hbase.node.health.script.timeout来配置脚本执行的超时时间,默认为60000
//通过hbase.node.health.failure.threshold来配置脚本可执行的线程个数,默认为3
//如果设置有检查的脚本,生成HealthCheckChore线程实例
intsleepTime
=this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
if(isHealthCheckerConfigured()){
healthCheckChore=
newHealthCheckChore(sleepTime,this,getConfiguration());
}

//Do we publish the status?
//配置hbase.status.published是否启用hbase状态发布,默认为false
//通过hbase.status.publisher.class配置publisher的实现类,
默认为ClusterStatusPublisher.MulticastPublisher
booleanshouldPublish=
conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
Class<?
extendsClusterStatusPublisher.Publisher>publisherClass=
conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.Publisher.class);

if(shouldPublish){
if(publisherClass==
null){
LOG.warn(HConstants.STATUS_PUBLISHED+
" is true, but "+
ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS+
"is not set - not publishing status");
}
else{
clusterStatusPublisherChore=
newClusterStatusPublisher(this,conf,publisherClass);
Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
}
}
//读取hbase.master.distributed.log.replay分布式进行splitlog的设置,默认为false
//此设置为true,启用分布式日志重播,不需要对每一个region生成recovered.edits,
//同时在replay时可以有其它写操作
distributedLogReplay=
this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
}

Hmaster.start方法

master.start方法直接启动master线程,调用hmaster.run方法
publicvoid
run(){
MonitoredTaskstartupStatus=
TaskMonitor.get().createStatus("Masterstartup");
startupStatus.setDescription("Masterstartup");
masterStartTime= System.currentTimeMillis();
try{
//生成activemaster
跟踪,通过zookeeper.znode.master配置master在zk上的路径,默认为master
this.masterAddressManager=
newMasterAddressTracker(getZooKeeperWatcher(),this);
this.masterAddressManager.start();

//Put up info server.
//生成webserver,default
port is 60010
intport
=this.conf.getInt("hbase.master.info.port",60010);
if(port
>=0) {
String
a= this.conf.get("hbase.master.info.bindAddress","0.0.0.0");
this.infoServer=
newInfoServer(MASTER,a,
port,false,this.conf);
this.infoServer.addServlet("status","/master-status",MasterStatusServlet.class);
this.infoServer.addServlet("dump","/dump",MasterDumpServlet.class);
this.infoServer.setAttribute(MASTER,this);
this.infoServer.start();
}

this.registeredZKListenersBeforeRecovery=
this.zooKeeper.getListeners();

//1.生成ActiveMasterManager实例
//2.检查hbase.master.backup,看看当前master是否是backup的master,如果不是(activeMaster)不做操作,
如果是,根据zk配置的zookeeper.session.timeout超时时间,默认为180000ms不停去检查是否有active
     如果没有active,此处一直等着activeMaster启动(stallIfBackupMaster方法)
3.1生成ClusterStatusTracker实例,并启动此线程实例,迭代判断
   zookeeper.znode.state配置的zk路径是否存在(也表示cluster是否启动),默认路径为running
3.2blockUntilBecomingActiveMaster方法,此方法最先进入的masteractiveMaster
原因是因为activeMaster没有注册成功前,backupMaster需要一直在stallIfBackupMaster方法等着
  如果master不是backup(activeMaster),把当前的master写入到zk的master路径下,注册此master,
路径通过zookeeper.znode.master进行配置,默认值为master
3.3.如果当前master是active的master(在zk创建master路径成功),需要删除backup下此master的路径
通过zookeeper.znode.backup.masters配置backup-master地址,默认=backup-masters
3.4.此时在stallIfBackupMaster方法中等着的所有backup-master会进入blockUntilBecomingActiveMaster方法

4.如果进入的master是backup的master,在配置的backup-master中注册自己的地址,

注册地址为(/hbase/backup-masters/ip,port,longcode)
  通过zookeeper.znode.backup.masters进行配置,默认值为backup-masters
5.以上部分如果是backup-master会一直等着,
直到currentactive master
shutdown,ActiveMasterManager.nodeDeleted调用
唤醒所有等着的线程,重新去生成currentactive
master .

becomeActiveMaster(startupStatus);

//We are either the active master or we were asked to shutdown
if(!this.stopped){
调用finishInitialization方法,此过程是启动比较重要的过程
//如果当前master是active的master,设置此节点为activeMaster(isActiveMaster=true)
//2.生成masterFileSystem,得到hbase的rootdir/tmpdir/,是否分布式日志重播,SplitLogManager
并生成启动此cluster的id,
2.1根据rootdir,生成hbase的FSTableDescriptors,实例(可以理解为是表的根路径)
2.2在zk中注册此clusterid,通过zookeeper.znode.clusterId进行配置,默认值为hbaseid
2.3生成hmaster中的执行线程ExecutorService,生成ServerManager的regionserver管理程序
3.初始化zk的跟踪器(调用initializeZKBasedSystemTrackers),
请参见Hmaster.initializeZKBasedSystemTrackers方法
4.1生成MasterCoprocessorHost的实例,主要用来控制对表更改/region状态更新的处理
4.2调用startServiceThreads启动相应的线程,请参见Hmaster.startServiceThreads方法
4.3调用[b]this.serverManager.waitForRegionServers(status);,等待regionserver的启动[/b]
请参见ServerManager.waitForRegionServers方法,,此时主线程等待
4.4判断regionServerTracker中的onlineserver是否在的serverManager的onlineserver中
  如果不在,添加进去
4.5判断metaregion是否需要进行logreplay,
  (WALs目录下的log:xx-splitting文件的server不在onlineserver中,
同时原来的metalocation在非onlineserver中)
4.6日志的split,如果启用了分布式logreplay,把log加入到
 zookeeper.znode.recovering.regions配置的zk路径下,默认值recovering-regions,不进行split
否则加入到zookeeper.znode.splitlog,路径下,默认值splitWAL,时行split
4.7分配metaregion,处理下线的rs,执行balancer
finishInitialization(startupStatus,false);
.......此处省去一些代码
}
}catch(Throwable
t){
//HBASE-5680: Likely hadoop23
vshadoop20.x/1.x incompatibility
........此处省去一些代码
}finally{
........此处省去一些代码,表示hmaster被停止.
LOG.info("HMastermain
thread exiting");
}

Hmaster.initializeZKBasedSystemTrackers方法

voidinitializeZKBasedSystemTrackers()throws
IOException,
InterruptedException,KeeperException {
//此部分用来生成meta表的路径跟踪程序,MetaRegionTracker,等待region-server注册metaregion.
//通过zookeeper.znode.metaserver来配置zk中meta表的路径,默认为meta-region-server
this.catalogTracker=
createCatalogTracker(this.zooKeeper,this.conf,this);
this.catalogTracker.start();
//得到loadbalancer的实现程序,并生成loadbalancer实例,
  通过hbase.master.loadbalancer.class进行配置,
默认的实现类为StochasticLoadBalancer(可参见其doc说明)
this.balancer=
LoadBalancerFactory.getLoadBalancer(conf);
//生成针对LoadBalancer的路径跟踪程序,loadbalancer通过balancer开启时注册到zk,
通过zookeeper.znode.balancer进行配置,默认值为balancer
this.loadBalancerTracker=
newLoadBalancerTracker(zooKeeper,this);
this.loadBalancerTracker.start();
//生成针对regionassign的路径跟踪程序,用来管理region的分配
1.通过hbase.assignment.timeout.management配置是否启用assigntime
out,默认为false
2.通过hbase.master.assignment.timeoutmonitor.period配置assign的超时检查间隔,默认为30000ms.
3.通过hbase.master.assignment.timeoutmonitor.timeout配置assign的超时时间,默认为600000ms
4.通过hbase.master.assignment.timerupdater.period配置assign的时间更新间隔,默认为10000
5.通过hbase.assignment.maximum.attempts配置assign的最大执行尝试次数,需要是>=1的值,默认为10
6.通过hbase.meta.assignment.retry.sleeptime来配置meta的assign重试的间隔时间,默认为1000ms
7.通过hbase.assignment.threads.max来配置regionassign的执行线程个数,默认为30
8.通过hbase.bulk.assignment.waittillallassigned配置bulk(批量的regionassign)
是否等待所有的assign完成后才能进行.默认为false
9.通过hbase.bulk.assignment.threshold.regions配置bulk执行的分配region个数,
10.通过hbase.bulk.assignment.threshold.servers配置bulk执行的分配regionserver个数
regionassign bulk说明:当需要分配的region个数>=9中配置的region个数,
    同时要分配的regionserver个数也>=10中配置的server个数时,表示是在进行批量的分配,
此时单个的分配性能就会比较差,因此此时就需要要使用批量的regionassign
11.通过hbase.assignment.zkevent.workers配置assign中触发zk事件的线程个数,默认为20
this.assignmentManager=
newAssignmentManager(this,serverManager,
this.catalogTracker,this.balancer,this.executorService,this.metricsMaster,
this.tableLockManager);
zooKeeper.registerListenerFirst(assignmentManager);
管理regionserver的注册与下线,此实例中维护着当前存活着的regionserver
zk中的注册路径通过zookeeper.znode.rs进行配置,默认值rs,通过zk的处理程序与ServerManager接合管理rs
当rs下线时(rs的路径在zk上被删除),会触发ServerManager的expireServer,
并触发ServerShutdownHandler.process,在此处会去scan meta表,
得到当前server在meta中记录的所有region
调用assignmentManager重新进行assign
this.regionServerTracker=
newRegionServerTracker(zooKeeper,this,
this.serverManager);
this.regionServerTracker.start();
Drain跟踪处理程序,如果一个regionserver不能够再分配region,那么此rs会被添加到
  通过zookeeper.znode.draining.rs配置的路径下默认为draining,
drainingServerTracker会记录住此rs,通过ServerManager进行处理
this.drainingServerTracker=
newDrainingServerTracker(zooKeeper,this,
this.serverManager);
this.drainingServerTracker.start();

//Set the cluster as up. If new RSs, they'll be waiting on this before
//going ahead with their startup.
booleanwasUp
=this.clusterStatusTracker.isClusterUp();
if(!wasUp)this.clusterStatusTracker.setClusterUp();
........此处省去一些代码
//create the snapshot manager
this.snapshotManager=
newSnapshotManager(this,this.metricsMaster);
}

Hmaster.startServiceThreads方法

voidstartServiceThreads()
throwsIOException{
//Start the executor service pools
//启动相应的线程
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
conf.getInt("hbase.master.executor.openregion.threads",5));
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
conf.getInt("hbase.master.executor.closeregion.threads",5));
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads",5));
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads",5));
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
conf.getInt("hbase.master.executor.logreplayops.threads",10));

//We depend on there being only one instance of this executor running
//at a time. To do concurrency, would need fencing of enable/disableof
//tables.
//处理对表的相关操作的线程,包括创建/修改/合并region等
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS,1);

//设置清理线程的执行间隔
String
n= Thread.currentThread().getName();
intcleanerInterval=
conf.getInt("hbase.master.cleaner.interval",60
* 1000);
//对hbase下.oldlog目录下的内容进行清理
1.通过hbase.master.logcleaner.ttl配置.oldlog目录下文件的ttl过期时间,默认值为600000=10分钟
2.通过hbase.master.logcleaner.plugins配置选择要删除的文件的过滤器,默认是TimeToLiveLogCleaner,
  多个用“,”号分开
this.logCleaner=
newLogCleaner(cleanerInterval,
this,conf,getMasterFileSystem().getFileSystem(),
getMasterFileSystem().getOldLogDir());
Threads.setDaemonThreadRunning(logCleaner.getThread(),n
+".oldLogCleaner");

//startthe
hfilearchive cleaner thread
//对hbase下archive的内容进行清理
1.通过hbase.master.hfilecleaner.plugins配置hfile文件的要删除文件过滤器,默认是TimeToLiveHFileCleaner
2.通过hbase.master.hfilecleaner.ttl配置hfile的过期ttl时间,默认值为60000*5=5分钟
Path
archiveDir= HFileArchiveUtil.getArchivePath(conf);
this.hfileCleaner=
newHFileCleaner(cleanerInterval,this,conf,getMasterFileSystem()
.getFileSystem(),archiveDir);
Threads.setDaemonThreadRunning(hfileCleaner.getThread(),n
+".archivedHFileCleaner");

//Start the health checker
启动节点健康状态检查的线程.(Hmster构造方法中的HealthCheckChore实例)
if(this.healthCheckChore!=
null){
Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(),n
+".healthChecker");
}

//Start allowing requests to happen.
this.rpcServer.openServer();
this.rpcServerOpen=
true;
if(LOG.isTraceEnabled()){
LOG.trace("Startedservice
threads");
}
}

ServerManager.waitForRegionServers方法
1.通过hbase.master.wait.on.regionservers.interval配置regionserver启动的等待时间,默认是1500ms
2.通过hbase.master.wait.on.regionservers.timeout配置regionserver的等待超时,默认是4500ms
3.通过hbase.master.wait.on.regionservers.mintostart配置需要等待的最小多少个regionserver启动起来,
默认1,不能小过1
4.通过hbase.master.wait.on.regionservers.maxtostart配置最大等待多少个regionserver启动起来,
默认值integer.maxvalue
5.maxtostart不能小过mintostart

[b]publicvoidwaitForRegionServers(MonitoredTaskstatus)[/b]
throwsInterruptedException {
finallonginterval
=this.master.getConfiguration().
getLong(WAIT_ON_REGIONSERVERS_INTERVAL,1500);
finallongtimeout
=this.master.getConfiguration().
getLong(WAIT_ON_REGIONSERVERS_TIMEOUT,4500);
intminToStart=
this.master.getConfiguration().
getInt(WAIT_ON_REGIONSERVERS_MINTOSTART,1);
if(minToStart<
1) {
........此处省去一些代码
minToStart= 1;
}
intmaxToStart=
this.master.getConfiguration().
getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART,Integer.MAX_VALUE);
if(maxToStart<
minToStart){
........此处省去一些代码
maxToStart= Integer.MAX_VALUE;
}

longnow
= System.currentTimeMillis();
finallongstartTime
=now;
longslept
= 0;
longlastLogTime=
0;
longlastCountChange=
startTime;
intcount
=countOfRegionServers();
intoldCount
=0;
//迭代条件,master没有被停止,同时启动的rs个数小过maxTostart
同时最后一次启动rs的时间已经超过了interval(1500ms),
 或者进入此方法的时间小过timeout(4500ms)的时间值
或者最小rs启动的个数小过mintostart的个数,
也就是要保证等待时间最小是4.5s,同时有minTostart个rs已经启动,此迭代停止
while(
!this.master.isStopped()&&
count<
maxToStart&&
(lastCountChange+interval>
now ||timeout >slept ||count
<minToStart)
){

//Log some info at every interval time or if there is a change
if(oldCount!=
count ||lastLogTime+interval<
now){
lastLogTime=
now;
........此处省去一些代码
status.setStatus(msg);
}

//We sleep for some time
finallongsleepTime
=50;
Thread.sleep(sleepTime);
now= System.currentTimeMillis();
slept=
now -startTime;

oldCount=
count;
count=
countOfRegionServers();
if(count
!=oldCount) {
lastCountChange=
now;
}
}
........此处省去一些代码
}

region assign,split等过程待分析
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: