RocketMQ源码分析之rocketmq-broker启动 (二)
2018-08-06 23:11
726 查看
版权声明:本博客为技术原创, 未经允许,谢绝转载 !!! https://blog.csdn.net/yewandemty/article/details/81463165
图一、broker启动流程图
1.1 initializetopicManager : 用于管理broker中存储的所有topic的配置
consumerOffsetManager: 管理Consumer的消费进度
subscriptionGroupManager: 用来管理订阅组,包括订阅权限等
messageStore: 用于broker层的消息落地存储
第14-19行,buildCommandlineOptions()构造命令行解析Options, 这个过程已经设置了h(help)、n(namesrvAddr)两个参数, 后面还会配置c(configFile), p(printConfigItem), m(printImportantConfig);parseCmdLine()方法会对mqbroker启动命令进行解析(*nix系统), 比如: nohup ./mqbroker -n 192.168.2.1:9876 ; 如果解析的commandLine==null, 则退出
第35-52行,如果启动命令行参数包含 -c 参数,会读取配置到Propertis中, 让后通过MixAll.properties2Object(), 将读取的配置文件信息存入brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig对应的实体类中,最后将配置文件路径信息保存到BrokerPathConfigHelper中brokerConfigPath变量中
第54行,将其它命令行参数读取到Properties中,并将信息保存到brokerConfig对象中
第62-75行,从brokerConfig获取namesrvAddr信息,并将地址信息转为SocketAddress对象
第77-91行,设置当前broker的角色(master,slave), 如果是同步/异步MASTER信息,brokerId=0 ;如果是SLAVE信息,brokerId > 0 ; 如果brokerId < 0 , 会抛出异常
第116-121行,将log的信息配置到brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig对象中
第122-128行,通过brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig对象创建BrokerController对象,并将propertis中保存的信息保存到brokerController的Configuration属性中
第136-159行,通过Runtime.getRuntime().addShutdownHook()设置,在jvm关闭之前需要处理的一些事情,系统会处理内存清理、对象销毁等一系列操作, 这里是对brokerController进行关闭操作。
本文章会从以下几个方面介绍rocketmq-broker启动流程
1. Broker简单介绍
1.1 initialize
1.2 start
2. broker启动代码分析
2.1 roketmq-broker启动入口
2.2 通过createBrokerController创建BrokerController对象
2.3 执行initialize方法进行初始化操作
2.4. 通过start启动broker信息
一,broker简单介绍
broker是消息接收处理,落地的核心模块。这个模块用于接收producer发送的消息以及consumer消费的消息。
broker启动主要分为initialize和start两个步骤。
1.1 initialize
1) broker启动会获取外部传入的初始化参数, 也会从配置文件中加载相关的配置参数 ,将参数保存在对应的Configuration类当中, Configuration类包含的消息如下
- broker自身的配置: 包括根目录 , namesrv地址,broker的IP和名称,消息队列数,收发消息线程池数等参数
- netty启动配置: 包括监听端口, 工作线程数, 异步发送消息信号量数量等参数。
- 存储层配置:包括存储根目录,CommitLog 配置,持久化策略配置等参数
2) 当配置信息设置完毕后, broker会将这些信息传入brokerController控制器当中,这个控制器会初始化加载很多的管理器
- rocketmq配置文件信息可以参考如下链接地址
https://www.geek-share.com/detail/2703481946.html - rocketMQ配置文件信息
3) 当管理器全部加载完毕后,控制器将开始进入下一步初始化
- 启动netty服务器,用于接收消息
- 初始化多个线程池,包括sendMessageExecutor、pullMessageExecutor、adminBrokerExecutor、clientManagerExecutor, 分别用于发送消息执行器、拉取消息执行器、broker管理器执行器、客户端管理执行器, 这些执行器回放如线程池中处理, 来做并发执行。
1.2 start
当broker初始化了配置参数后,就可以开始启动了
- 启动刚刚初始化的各个管理器以后:topicManager , consumerOffsetManager, subscriptionGroupManager, messgeStoreManager
- 开启定时调度线程, 每隔30s向nameSrv上报自己的信息
- 启动线程处理无用的topic
二、broker启动代码分析
2.1 roketmq-broker启动入口
public class BrokerStartup { //... public static void main(String[] args) { start(createBrokerController(args)); } //... }
- 第3-5行, broker启动程序入口, 包含两部分, createBrokerController(args) 和 start(brokerController)
2.2 通过createBrokerController创建BrokerController对象
public static BrokerController createBrokerController(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 131072; } if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 131072; } try { //PackageConflictDetect.detectFastjson(); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); } final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); nettyServerConfig.setListenPort(10911); final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); } if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); properties2SystemEnv(properties); MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, messageStoreConfig); BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + " variable in your environment to match the location of the RocketMQ installation"); System.exit(-2); } String namesrvAddr = brokerConfig.getNamesrvAddr(); if (null != namesrvAddr) { try { String[] addrArray = namesrvAddr.split(";"); for (String addr : addrArray) { RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { System.out.printf( "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr); System.exit(-3); } } switch (messageStoreConfig.getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: brokerConfig.setBrokerId(MixAll.MASTER_ID); break; case SLAVE: if (brokerConfig.getBrokerId() <= 0) { System.out.printf("Slave's brokerId must be > 0"); System.exit(-3); } break; default: break; } messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); if (commandLine.hasOption('p')) { Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig); MixAll.printObjectProperties(console, nettyServerConfig); MixAll.printObjectProperties(console, nettyClientConfig); MixAll.printObjectProperties(console, messageStoreConfig); System.exit(0); } else if (commandLine.hasOption('m')) { Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig, true); MixAll.printObjectProperties(console, nettyServerConfig, true); MixAll.printObjectProperties(console, nettyClientConfig, true); MixAll.printObjectProperties(console, messageStoreConfig, true); System.exit(0); } log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); MixAll.printObjectProperties(log, brokerConfig); MixAll.printObjectProperties(log, nettyServerConfig); MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long beginTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - beginTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
- 第2行,设置rocketMQ版本信息
- 第4-10行,校验远程通信的发送缓存和接收缓存是否为空, 如果为空则设置默认值大小为131072
NettySystemConfig.socketSndbufSize = 131072; - NettySystemConfig.socketRcvbufSize = 131072;
2.3 执行initialize方法进行初始化操作
public boolean initialize() throws CloneNotSupportedException { boolean result = this.topicConfigManager.load(); result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load(); if (result) { try { this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } result = result && this.messageStore.load(); if (result) { this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_")); this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); this.registerProcessor(); final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask syncAll slave exception", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } else { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } } return result; }
- 第2-6行, 加载topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager ,将加载结果成功与否存储在result中
- 第8-22行,如果前面的资源加载成功,会构建messageStore对象信息,在加载插件plugins到content后,会通过MessageStoreFactory.build()再次处理messageStore信息,最后通过load()方法加载messageStore信息
- 第26-191行,校验资源加载result是否成功,如果不成功,则直接返回result结果false, 如果result = true,
1) 创建sendMessageExecutor、pullMessageExecutor、queryMessageExecutor、adminBrokerExecutor、clientManageExecutor、consumerManageExecutor执行器信息
2) 通过registerProcessor()方法向remotingServer, fastRemotingServer对象中注册刚刚创建的执行器(Excuter)信息
3) 通过scheduledExecutorService:ScheduledExecutorService 对象,定期查询如下信息
BrokerController.this.getBrokerStats().record(); 记录broker的状态 - BrokerController.this.consumerOffsetManager.persist(); 消费者当前信息的offset位置
- BrokerController.this.consumerFilterManager.persist(); 消费者filterManaer信息
- BrokerController.this.protectBroker(); Broker信息
- BrokerController.this.printWaterMark();
- BrokerController.this.getMessageStore().dispatchBehindBytes() 打印日志信息
- BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); 获取namesrv地址信息
- BrokerController.this.slaveSynchronize.syncAll(); 同步从broker信息,
2.4. 通过start启动broker信息
public static BrokerController start(BrokerController controller) { try { controller.start(); // ... return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
- 第3行,启动brokerControler
public void start() throws Exception { if (this.messageStore != null) { this.messageStore.start(); } if (this.remotingServer != null) { this.remotingServer.start(); } if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); } if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); } if (this.filterServerManager != null) { this.filterServerManager.start(); } this.registerBrokerAll(true, false); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS); //... }
- 第2-28行,如果参数校验不为空,则启动messageStore、remotingServer、fastRemotingServer、brokerOuterAPI、pullRequestHoldService、clientHousekeepingService、filterServerManager相关信息
- 第30-40行,通过方法registerBrokerAll()开启调度线程,每隔30s,向namesrv上报自己的信息,处理无用的topic
上面是关于broker启动的源码分析, 后面有新的理解会在此文章的基础上进行修改。
阅读更多相关文章推荐
- Rocketmq之namesrv启动流程源码详解分析
- 查看RocketMQ的broker启动部分源码分析总结
- rocketmq源码分析2-broker的消息接收
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- ejabberd16.09源码分析(二) ejabberd启动详解
- Cocos2d-x:整体框架源码分析以及启动过程原理(win32)
- Service启动过程and新进程创建全过程源码分析
- Phalcon框架启动流程(部分源码)分析
- Android服务之PackageManagerService启动源码分析
- Heritrix源码分析(五) 如何让Heritrix在Ecplise等IDE下编程启动
- OpenStack源码分析之cinder-api服务启动
- 小伙伴们的ceph源码分析二——monitor启动流程
- Nginx源码分析---Nginx启动初始化过程(二)
- spark源码学习(八)--- executor启动task分析
- u-boot源码分析 --- 启动第二阶段004
- Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
- tomcat 4.1.30启动过程的源码分析
- RocketMQ 源码分析
- [置顶] Activity启动流程源码分析(应用中)
- Monkey源码分析2—Monkey代码如何被启动执行