您的位置:首页 > 编程语言

hbase 源代码分析 (12) Master和RegionService 启动过程

2017-07-16 00:13 411 查看
第9篇hbaseshell启动过程时,最后说到HMaster和HRegionService启动入口。而本地模式只需要启动HMaster就行,内部会启动zk和HRegionService主要流程。1)HMaster是HregionService的实现类。所以启动HMaster会先启动HregionService,2)HregionService里面启动了很多进程。主要有协处理器,zk,rpc,锁,状态追踪器,wal继承或者类。3)HMaster会向zk里注册信息。写入当前ServerName,方便Client找到。4)HMaster会加载Meta信息到内存。然后等待RegionService的报告,如果没有收到报告,会去zk询问已经启动但是没有报告的RegionService,5)HMaster会启动均衡6)会启动一下健康检测的机制。HMaster.java
/**
*@seeorg.apache.hadoop.hbase.master.HMasterCommandLine
*/
publicstaticvoidmain(String[]args){
VersionInfo.logVersion();
newHMasterCommandLine(HMaster.class).doMain(args);
}
这里会调用HmasterCommandLine.java的run线程
publicintrun(Stringargs[])throwsException{
//此处有省略,
//主要是参数设置,
Stringcommand=remainingArgs.get(0);if("start".equals(command)){
returnstartMaster();
}elseif("stop".equals(command)){
returnstopMaster();
}elseif("clear".equals(command)){
return(ZNodeClearer.clear(getConf())?0:1);
}else{
usage("Invalidcommand:"+command);
return1;
}
}
[/code]startMaster
privateintstartMaster(){
Configurationconf=getConf();
try{
//本地模式。master和regionService公用一个jVM
if(LocalHBaseCluster.isLocal(conf)){
....//省略配置zk信息
LocalHBaseClustercluster=newLocalHBaseCluster(conf,mastersCount,regionServersCount,
LocalHMaster.class,HRegionServer.class);
((LocalHMaster)cluster.getMaster(0)).setZKCluster(zooKeeperCluster);
cluster.startup();
waitOnMasterThreads(cluster);
}else{
//集群模式
logProcessInfo(getConf());
CoordinatedStateManagercsm=
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
HMastermaster=HMaster.constructMaster(masterClass,conf,csm);
master.start();
master.join();
if(master.isAborted())
thrownewRuntimeException("HMasterAborted");
}
}
}
[/code]1)本地模式本地模式主要是将newlocalMaster,newregionService.这里里面主要是启动协处理管理器ZkCoordinatedStateManager,MasterThread,RegionServerThread启动了大量协处理。
@Overridepublicvoidinitialize(Serverserver){this.server=server;this.watcher=server.getZooKeeper();splitLogWorkerCoordination=newZkSplitLogWorkerCoordination(this,watcher);splitLogManagerCoordination=newZKSplitLogManagerCoordination(this,watcher);splitTransactionCoordination=newZKSplitTransactionCoordination(this,watcher);closeRegionCoordination=newZkCloseRegionCoordination(this,watcher);openRegionCoordination=newZkOpenRegionCoordination(this,watcher);regionMergeCoordination=newZkRegionMergeCoordination(this,watcher);}
因为Hmaster继承RegionServicer,所以Hmaster和regionService都需要启动这些。
[code]publicHRegionServer(Configurationconf,CoordinatedStateManagercsm)
throwsIOException,InterruptedException{
super("RegionServer");//threadname
//此处有省略,//主要配置信息rpcControllerFactory=RpcControllerFactory.instantiate(this.conf);
rpcRetryingCallerFactory=RpcRetryingCallerFactory.instantiate(this.conf);
//设置了些jaas,keytab等文件
ZKUtil.loginClient(this.conf,HConstants.ZK_CLIENT_KEYTAB_FILE,
HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL,hostName);
//登录
login(userProvider,hostName);
//启动文件系统
this.fs=newHFileSystem(this.conf,useHBaseChecksum);
//加载meta信息
this.tableDescriptors=newFSTableDescriptors(
this.conf,this.fs,this.rootDir,!canUpdateTableDescriptor(),false);
//执行服务器
service=newExecutorService(getServerName().toShortString());
//hbase-site.xml中读取span
spanReceiverHost=SpanReceiverHost.getInstance(getConfiguration());
//Someunittestsdon'tneedacluster,sonozookeeperatall
if(!conf.getBoolean("hbase.testing.nocluster",false)){
//Openconnectiontozookeeperandsetprimarywatcher
//集群采用用,zk
zooKeeper=newZooKeeperWatcher(conf,getProcessName()+":"+
rpcServices.isa.getPort(),this,canCreateBaseZNode());
//协处理器管理器
this.csm=(BaseCoordinatedStateManager)csm;
this.csm.initialize(this);
this.csm.start();
//table锁
tableLockManager=TableLockManager.createTableLockManager(
conf,zooKeeper,serverName);
//集群需要最终master的地址,
masterAddressTracker=newMasterAddressTracker(getZooKeeper(),this);
masterAddressTracker.start();
//集群需要追踪集群状态。
clusterStatusTracker=newClusterStatusTracker(zooKeeper,this);
clusterStatusTracker.start();
}
//rpc服务端启动,接收client的请求
rpcServices.start();//启动web页面
putUpWebUI();
//WAL
this.walRoller=newLogRoller(this,this);
//一些杂事处理。
this.choreService=newChoreService(getServerName().toString(),true);
this.flushThroughputController=FlushThroughputControllerFactory.create(this,conf);
//这个没看懂,
if(!SystemUtils.IS_OS_WINDOWS){
Signal.handle(newSignal("HUP"),newSignalHandler(){
@Override
publicvoidhandle(Signalsignal){
getConfiguration().reloadConfiguration();
configurationManager.notifyAllObservers(getConfiguration());
}
});
}
//定时删除不用的压缩文件。
this.compactedFileDischarger=
newCompactedHFilesDischarger(cleanerInterval,(Stoppable)this,(RegionServerServices)this);
choreService.scheduleChore(compactedFileDischarger);
}
[/code]HMaster一些特殊的启动
publicHMaster(finalConfigurationconf,CoordinatedStateManagercsm)
throwsIOException,KeeperException,InterruptedException{
super(conf,csm);
//集群状态发表。比如当regionService死了,要立即告知client,不要用client等待socket回应了。
clusterStatusPublisherChore=newClusterStatusPublisher(this,conf,publisherClass);
getChoreService().scheduleChore(clusterStatusPublisherChore);
//Someunittestsdon'tneedacluster,sonozookeeperatall
if(!conf.getBoolean("hbase.testing.nocluster",false)){
//注入Master到zk,
activeMasterManager=newActiveMasterManager(zooKeeper,this.serverName,this);
intinfoPort=putUpJettyServer();
//启动Master这里主要是,在zk里建立一个节点,然后将master的主机信息写入zk
startActiveMasterManager(infoPort);
}else{
activeMasterManager=null;
}
}
[/code]这个startActiveMasterManager(InfoPort)很有研究价值。最终会调用下面的方法。在ActiveMasterManager里
//主要是当这个master启动起来后,去尝试王zk写入节点信息。如果写成功了,这这个master是有效的master节点,然后从备份中删除自己,如果写入不成功,这说明有有效的主节点了。然后去检查一下有效的节点是否和当前情况一样。如果一样的,说明已经启动过,删除zk信息,然后删除磁盘信息。
booleanblockUntilBecomingActiveMaster(
intcheckInterval,MonitoredTaskstartupStatus){
StringbackupZNode=ZKUtil.joinZNode(
this.watcher.backupMasterAddressesZNode,this.sn.toString());
while(!(master.isAborted()||master.isStopped())){
startupStatus.setStatus("TryingtoregisterinZKasactivemaster");
try{
if(MasterAddressTracker.setMasterAddress(this.watcher,
this.watcher.getMasterAddressZNode(),this.sn,infoPort)){
//Ifwewereabackupmasterbefore,deleteourZNodefromthebackup
//masterdirectorysincewearetheactivenow)
if(ZKUtil.checkExists(this.watcher,backupZNode)!=-1){
LOG.info("DeletingZNodefor"+backupZNode+"frombackupmasterdirectory");
ZKUtil.deleteNodeFailSilent(this.watcher,backupZNode);
}
//Savetheznodeinafile,thiswillallowtocheckifwecrashinthelaunchscripts
ZNodeClearer.writeMyEphemeralNodeOnDisk(this.sn.toString());
//Wearethemaster,return
startupStatus.setStatus("Successfullyregisteredasactivemaster.");
this.clusterHasActiveMaster.set(true);
LOG.info("RegisteredActiveMaster="+this.sn);
returntrue;
}
//Thereisanotheractivemasterrunningelsewhereorthisisarestart
//andthemasterephemeralnodehasnotexpiredyet.
this.clusterHasActiveMaster.set(true);
Stringmsg;
byte[]bytes=
ZKUtil.getDataAndWatch(this.watcher,this.watcher.getMasterAddressZNode());
if(bytes==null){
msg=("Amasterwasdetected,butwentdownbeforeitsaddress"+
"couldberead.Attemptingtobecomethenextactivemaster");
}else{
ServerNamecurrentMaster;
try{
currentMaster=ServerName.parseFrom(bytes);
}catch(DeserializationExceptione){
LOG.warn("Failedparse",e);
//Hopefullynexttimearoundwewon'tfailtheparse.Dangerous.
continue;
}
if(ServerName.isSameHostnameAndPort(currentMaster,this.sn)){
msg=("Currentmasterhasthismaster'saddress,"+
currentMaster+";masterwasrestarted?Deletingnode.");
//Hurryalongtheexpirationoftheznode.
ZKUtil.deleteNode(this.watcher,this.watcher.getMasterAddressZNode());
//Wemayhavefailedtodeletetheznodeatthepreviousstep,but
//wedeletethefileanyway:asecondattempttodeletetheznodeislikelytofailagain.
ZNodeClearer.deleteMyEphemeralNodeOnDisk();
}else{
msg="Anothermasteristheactivemaster,"+currentMaster+
";waitingtobecomethenextactivemaster";
}
}
LOG.info(msg);
startupStatus.setStatus(msg);
}catch(KeeperExceptionke){
master.abort("ReceivedanunexpectedKeeperException,aborting",ke);
returnfalse;
}
synchronized(this.clusterHasActiveMaster){
while(clusterHasActiveMaster.get()&&!master.isStopped()){
try{
clusterHasActiveMaster.wait(checkInterval);
}catch(InterruptedExceptione){
//Weexpecttobeinterruptedwhenamasterdies,
//willfalloutifso
LOG.debug("Interruptedwaitingformastertodie",e);
}
}
if(clusterShutDown.get()){
this.master.stop(
"Clusterwentdownbeforethismasterbecameactive");
}
}
}
returnfalse;
}
[/code]然后是启动的Master不是backup的。需要调用这个方法finishActiveMasterInitialization这个方法最主要的功能,等待regionService向HMaster报告,如果没有这会去zk问一下原因。
privatevoidfinishActiveMasterInitialization(MonitoredTaskstatus)
throwsIOException,InterruptedException,KeeperException,CoordinatedStateException{
/*
*Weareactivemasternow...goinitializecomponentsweneedtorun.
*Note,theremaybedrossinzkfrompreviousruns;it'llgetaddressed
*belowafterwedetermineifclusterstartuporfailover.
*/
status.setStatus("InitializingMasterfilesystem");
this.masterActiveTime=System.currentTimeMillis();
//TODO:DothisusingDependencyInjection,usingPicoContainer,GuiceorSpring.
this.fileSystemManager=newMasterFileSystem(this,
//装载表信息到内存
//enabletabledescriptorscache
this.tableDescriptors.setCacheOn();
//settheMETA'sdescriptortothecorrectreplication
this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
conf.getInt(HConstants.META_REPLICAS_NUM,HConstants.DEFAULT_META_REPLICA_NUM));
//warm-upHTDscacheonmasterinitialization
if(preLoadTableDescriptors){
status.setStatus("Pre-loadingtabledescriptors");
this.tableDescriptors.getAll();
}
//写入ClusterID到zk
//publishclusterID
status.setStatus("PublishingClusterIDinZooKeeper");
ZKClusterId.setClusterId(this.zooKeeper,fileSystemManager.getClusterId());
this.serverManager=createServerManager(this,this);
setupClusterConnection();
//Invalidateallwritelocksheldpreviously
this.tableLockManager.reapWriteLocks();
status.setStatus("InitializingZKsystemtrackers");
initializeZKBasedSystemTrackers();
//initializemastersidecoprocessorsbeforewestarthandlingrequests
status.setStatus("Initializingmastercoprocessors");
this.cpHost=newMasterCoprocessorHost(this,this.conf);
//startupallservicethreads.
status.setStatus("Initializingmasterservicethreads");
startServiceThreads();
//Wakeupthisservertocheckin
sleeper.skipSleepCycle();
//等待regionService注册
//Waitforregionserverstoreportin
this.serverManager.waitForRegionServers(status);
//Checkzkforregionserversthatareupbutdidn'tregister
for(ServerNamesn:this.regionServerTracker.getOnlineServers()){
//TheisServerOnlinecheckisopportunistic,correctnessishandledinside
if(!this.serverManager.isServerOnline(sn)
&&serverManager.checkAndRecordNewServer(sn,ServerLoad.EMPTY_SERVERLOAD)){
LOG.info("Registeredserverfoundupinzkbutwhohasnotyetreportedin:"+sn);
}
}
//getalistforpreviouslyfailedRSwhichneedlogsplittingwork
//werecoverhbase:metaregionserversinsidemasterinitializationand
//handleotherfailedserversinSSHinordertostartupmasternodeASAP
Set<ServerName>previouslyFailedServers=
this.fileSystemManager.getFailedServersFromLogFolders();
//logsplittingforhbase:metaserver
ServerNameoldMetaServerLocation=metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
if(oldMetaServerLocation!=null&&previouslyFailedServers.contains(oldMetaServerLocation)){
splitMetaLogBeforeAssignment(oldMetaServerLocation);
//Note:wecan'tremoveoldMetaServerLocationfrompreviousFailedServerslistbecauseit
//mayalsohostuserregions
}
Set<ServerName>previouslyFailedMetaRSs=getPreviouselyFailedMetaServersFromZK();
//needtouseunionofpreviouslyFailedMetaRSsrecordedinZKandpreviouslyFailedServers
//insteadofpreviouslyFailedMetaRSsalonetoaddressthefollowingtwosituations:
//1)thechainedfailuresituation(recoveryfailedmultipletimesinarow).
//2)mastergetkilledrightbeforeitcoulddeletetherecoveringhbase:metafromZKwhilethe
//sameserverstillhasnon-metawalstobereplayedsothat
//removeStaleRecoveringRegionsFromZKcan'tdeletethestalehbase:metaregion
//PassingmoreserversintosplitMetaLogisallright.Ifaserverdoesn'thavehbase:metawal,
//thereisnoopfortheserver.
previouslyFailedMetaRSs.addAll(previouslyFailedServers);
this.initializationBeforeMetaAssignment=true;
//Waitforregionservertofinishinitialization.
if(BaseLoadBalancer.tablesOnMaster(conf)){
waitForServerOnline();
}
//设置负载均衡
//initializeloadbalancer
this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setMasterServices(this);
this.balancer.initialize();
//Checkifmasterisshuttingdownbecauseofsomeissue
//ininitializingtheregionserverorthebalancer.
if(isStopped())return;
//Makesuremetaassignedbeforeproceeding.
status.setStatus("AssigningMetaRegion");
assignMeta(status,previouslyFailedMetaRSs,HRegionInfo.DEFAULT_REPLICA_ID);
//checkifmasterisshuttingdownbecauseaboveassignMetacouldreturnevenhbase:metaisn't
//assignedwhenmasterisshuttingdown
if(isStopped())return;
status.setStatus("Submittinglogsplittingworkforpreviouslyfailedregionservers");
//Masterhasrecoveredhbase:metaregionserverandweput
//otherfailedregionserversinaqueuetobehandledlaterbySSH
for(ServerNametmpServer:previouslyFailedServers){
this.serverManager.processDeadServer(tmpServer,true);
}
//UpdatemetawithnewPBserializationifrequired.i.emigrateallHRItoPBserialization
//inmeta.Thismusthappenbeforeweassignalluserregionsorelsetheassignmentwillfail.
if(this.conf.getBoolean("hbase.MetaMigrationConvertingToPB",true)){
MetaMigrationConvertingToPB.updateMetaIfNecessary(this);
}
//Fixupassignmentmanagerstatus
status.setStatus("Startingassignmentmanager");
this.assignmentManager.joinCluster();
//setclusterstatusagainafteruserregionsareassigned
this.balancer.setClusterStatus(getClusterStatus());
//Startbalancerandmetacatalogjanitoraftermetaandregionshavebeenassigned.
status.setStatus("Startingbalancerandcatalogjanitor");
this.clusterStatusChore=newClusterStatusChore(this,balancer);
getChoreService().scheduleChore(clusterStatusChore);
this.balancerChore=newBalancerChore(this);
getChoreService().scheduleChore(balancerChore);
this.normalizerChore=newRegionNormalizerChore(this);
getChoreService().scheduleChore(normalizerChore);
this.catalogJanitorChore=newCatalogJanitor(this,this);
getChoreService().scheduleChore(catalogJanitorChore);
//DoMetricsperiodically
periodicDoMetricsChore=newPeriodicDoMetrics(msgInterval,this);
getChoreService().scheduleChore(periodicDoMetricsChore);
status.setStatus("Startingnamespacemanager");
initNamespace();
if(this.cpHost!=null){
try{
this.cpHost.preMasterInitialization();
}catch(IOExceptione){
LOG.error("CoprocessorpreMasterInitialization()hookfailed",e);
}
}
status.markComplete("Initializationsuccessful");
LOG.info("Masterhascompletedinitialization");
configurationManager.registerObserver(this.balancer);
//Setmasteras'initialized'.
setInitialized(true);
status.setStatus("Startingquotamanager");
initQuotaManager();
//assignthemetareplicas
Set<ServerName>EMPTY_SET=newHashSet<ServerName>();
intnumReplicas=conf.getInt(HConstants.META_REPLICAS_NUM,
HConstants.DEFAULT_META_REPLICA_NUM);
for(inti=1;i<numReplicas;i++){
assignMeta(status,EMPTY_SET,i);
}
unassignExcessMetaReplica(zooKeeper,numReplicas);
//clearthedeadserverswithsamehostnameandportofonlineserverbecausewearenot
//removingdeadserverwithsamehostnameandportofrswhichistryingtocheckinbefore
//masterinitialization.SeeHBASE-5916.
this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
//CheckandsettheznodeACLsifneededincaseweareovertakinganon-secureconfiguration
status.setStatus("CheckingZNodeACLs");
zooKeeper.checkAndSetZNodeAcls();
status.setStatus("CallingpostStartMastercoprocessors");
if(this.cpHost!=null){
//don'tletcpinitializationerrorskillthemaster
try{
this.cpHost.postStartMaster();
}catch(IOExceptionioe){
LOG.error("CoprocessorpostStartMaster()hookfailed",ioe);
}
}
zombieDetector.interrupt();
}
[/code]最后调用run方法。里面主要是启动一下健康检查等类,然后等待停止。regionServer的run里面还有很多线程启动,最终要的是MemStoreFlusher,CompactSplitThread,registerConfigurationObservers等。memstore线程将在下个章节flush和spit过程中使用到。
privatevoidinitializeThreads()throwsIOException{
//Cacheflushingthread.
this.cacheFlusher=newMemStoreFlusher(conf,this);
//Compactionthread
this.compactSplitThread=newCompactSplitThread(this);
//Backgroundthreadtocheckforcompactions;neededifregionhasnotgottenupdates
//inawhile.Itwilltakecareofnotcheckingtoofrequentlyonstore-by-storebasis.
this.compactionChecker=newCompactionChecker(this,this.threadWakeFrequency,this);
this.periodicFlusher=newPeriodicMemstoreFlusher(this.threadWakeFrequency,this);
this.leases=newLeases(this.threadWakeFrequency);
//Createthethreadtocleanthemovedregionslist
movedRegionsCleaner=MovedRegionsCleaner.create(this);
if(this.nonceManager!=null){
//Createthescheduledchorethatcleansupnonces.
nonceManagerChore=this.nonceManager.createCleanupScheduledChore(this);
}
//SetuptheQuotaManager
rsQuotaManager=newRegionServerQuotaManager(this);
//SetupRPCclientformastercommunication
rpcClient=RpcClientFactory.createClient(conf,clusterId,newInetSocketAddress(
rpcServices.isa.getAddress(),0),clusterConnection.getConnectionMetrics());
if(storefileRefreshPeriod>0){
this.storefileRefresher=newStorefileRefresherChore(storefileRefreshPeriod,
onlyMetaRefresh,this,this);
}
registerConfigurationObservers();
}
[/code]到此结束。上一章节:hbase原代码分析(11)WAL写日志过程http://blog.csdn.net/chenfenggang/article/details/75142075regionServer的run里面还有很多线程启动,最终要的是MemStoreFlusher,CompactSplitThread,registerConfigurationObservers等。
privatevoidinitializeThreads()throwsIOException{
//Cacheflushingthread.
this.cacheFlusher=newMemStoreFlusher(conf,this);
//Compactionthread
this.compactSplitThread=newCompactSplitThread(this);
//Backgroundthreadtocheckforcompactions;neededifregionhasnotgottenupdates
//inawhile.Itwilltakecareofnotcheckingtoofrequentlyonstore-by-storebasis.
this.compactionChecker=newCompactionChecker(this,this.threadWakeFrequency,this);
this.periodicFlusher=newPeriodicMemstoreFlusher(this.threadWakeFrequency,this);
this.leases=newLeases(this.threadWakeFrequency);
//Createthethreadtocleanthemovedregionslist
movedRegionsCleaner=MovedRegionsCleaner.create(this);
if(this.nonceManager!=null){
//Createthescheduledchorethatcleansupnonces.
nonceManagerChore=this.nonceManager.createCleanupScheduledChore(this);
}
//SetuptheQuotaManager
rsQuotaManager=newRegionServerQuotaManager(this);
//SetupRPCclientformastercommunication
rpcClient=RpcClientFactory.createClient(conf,clusterId,newInetSocketAddress(
rpcServices.isa.getAddress(),0),clusterConnection.getConnectionMetrics());
booleanonlyMetaRefresh=false;
intstorefileRefreshPeriod=conf.getInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
,StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
if(storefileRefreshPeriod==0){
storefileRefreshPeriod=conf.getInt(
StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
onlyMetaRefresh=true;
}
if(storefileRefreshPeriod>0){
this.storefileRefresher=newStorefileRefresherChore(storefileRefreshPeriod,
onlyMetaRefresh,this,this);
}
registerConfigurationObservers();
}
[/code]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  源代码 hbase