rocketmq 4.0 Broker启动流程分析
2017-06-17 00:00
701 查看
摘要: BrokerController创建与初始化
在一次网络故障中引发的消息重发而导致深度研究源码的瞎想!!!回想当时整个过程是这样的:项目部署的主从,当天下午主网络问题,然后停掉主和从,将从改为主,启动后未发生任何重发消息;我当时建议把以前的主改为从启动起来,运维人员说以前主配置内存大些,然后就停掉了现在的一主,将其又改为从,以前的那个作为主又启动起来,后续问题出现了,结果大量消息重新消费;再由之有些业务并未做去重处理,显然整个数据就紊乱了。
注:消息服务做了补偿机制,即使宕机也不会影响业务端流程正常运行和数据丢失现象。
我查看了很久,没有看出任何问题,检查了所有日志输出信息也未发现问题,后来的推断应该是consumerOffset.json文件或者其他文件消费索引文件给重置了,然后把消息重新尝试发了一遍,建立消费索引。废话少说,以下我们来分析broker源码启动流程,来寻找真正的原因。
初始化过程如下:
1.加载延迟delayOffset.json文件
--加载config\delayOffset.json文件,若文件
3ff0
为空则加载config\delayOffset.json.bak文件
--设置默认等级参数(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 4h 6h 12h 1d 2d)
2.加载提交commitlog目录日志
--commitlog目录下所有日志信息并写,刷新,提交的。例如:00000000000000000000文件
3.加载consumequeue目录文件,消费队列
--按照topic主题和设置的队列数来加载
4.设置checkpoint文件位置信息
--index目录下的所有文件
--如果rocketmq非正常退出,则根据当前启动Checkpoint的时间和最后一次时间比较,若最后一次时间大于当前时间则直接删除该文件,否则保留文件
--然后加载到索引文件集合中
--恢复消费队列,如果是正常退出,所有的内存数据都将被刷新进行数据恢复,若非正常退出,恢复正常的数据,错误的数据则丢失
--恢复topic队列,设置commitLog对象topic消费记录。例如:{topic_1-1=250, topic_1-0=250, topic_1-3=249, topic_1-2=251}
在一次网络故障中引发的消息重发而导致深度研究源码的瞎想!!!回想当时整个过程是这样的:项目部署的主从,当天下午主网络问题,然后停掉主和从,将从改为主,启动后未发生任何重发消息;我当时建议把以前的主改为从启动起来,运维人员说以前主配置内存大些,然后就停掉了现在的一主,将其又改为从,以前的那个作为主又启动起来,后续问题出现了,结果大量消息重新消费;再由之有些业务并未做去重处理,显然整个数据就紊乱了。
注:消息服务做了补偿机制,即使宕机也不会影响业务端流程正常运行和数据丢失现象。
我查看了很久,没有看出任何问题,检查了所有日志输出信息也未发现问题,后来的推断应该是consumerOffset.json文件或者其他文件消费索引文件给重置了,然后把消息重新尝试发了一遍,建立消费索引。废话少说,以下我们来分析broker源码启动流程,来寻找真正的原因。
配置加载和BrokerController创建
public static BrokerController createBrokerController(String[] args) { // 设置rocketmq.remoting.version变量到系统 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); // 检查netty远程通讯缓冲大小配置并初始化 if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 131072; } if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 131072; } try { // 创建命令行参数:如help, mqbroker等命令,设置namesrvAddr-->n, h-->help等后续会用到 Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); } // 初始化Broker配置 final BrokerConfig brokerConfig = new BrokerConfig(); // 初始化netty客户端和服务器配置 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); nettyServerConfig.setListenPort(10911); // 初始化消息存储位置和rocket所有启动broker的加载的properties配置信息 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); // 默认初始化是为异步复制 if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); } if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, brokerConfig); MixAll.printObjectProperties(null, nettyServerConfig); MixAll.printObjectProperties(null, nettyClientConfig); MixAll.printObjectProperties(null, messageStoreConfig); System.exit(0); } else if (commandLine.hasOption('m')) { MixAll.printObjectPropert 3ff0 ies(null, brokerConfig, true); MixAll.printObjectProperties(null, nettyServerConfig, true); MixAll.printObjectProperties(null, nettyClientConfig, true); MixAll.printObjectProperties(null, messageStoreConfig, true); System.exit(0); } // 启动时设置了 -c /conf/2m-2s-sync/broker-a.properties文件加载目录 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { // broker-a.properties文件 configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); // 加载文件到属性集合中 properties = new Properties(); properties.load(in); // 设置配置对象值 parsePropertie2SystemEnv(properties); MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, messageStoreConfig); BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } // 将nameSrv和file路径设置到BrokerConfig对象中 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); //brokerConfig.setRocketmqHome("H:/open_source_project/incubator-rocketmq"); // rocketmqHome为空则退出程序 if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + " variable in your environment to match the location of the RocketMQ installation"); System.exit(-2); } String namesrvAddr = brokerConfig.getNamesrvAddr(); if (null != namesrvAddr) { try { String[] addrArray = namesrvAddr.split(";"); for (String addr : addrArray) { RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { System.out.printf( "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr); System.exit(-3); } } // 设置主从BrokerId参数 switch (messageStoreConfig.getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: brokerConfig.setBrokerId(MixAll.MASTER_ID); break; case SLAVE: if (brokerConfig.getBrokerId() <= 0) { System.out.printf("Slave's brokerId must be > 0"); System.exit(-3); } break; default: break; } // 设置HA监听端口10912 messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); // 设置日志工厂对象,主要用于Logger实例化 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); // MixAll.printObjectProperties(log, brokerConfig); MixAll.printObjectProperties(log, nettyServerConfig); MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); // 创建并初始化BrokerController对象 final BrokerController controller = new BrokerController(// brokerConfig, // nettyServerConfig, // nettyClientConfig, // messageStoreConfig); // 合并当前所有配置信息防止后续丢失 controller.getConfiguration().registerConfig(properties); // 初始化加载本地store等系统索引,消费等文件 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long begineTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - begineTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
BrokerController初始化和文件恢复加载等
public boolean initialize() throws CloneNotSupportedException { boolean result = true; // 加载config\topics.json文件,若文件为空则加载config\topics.json.bak文件 result = result && this.topicConfigManager.load(); // 加载config\consumerOffset.json文件,若文件为空则加载config\consumerOffset.json.bak文件 result = result && this.consumerOffsetManager.load(); // 加载config\subscriptionGroup.json文件,若文件为空则加载config\subscriptionGroup.json.bak文件 result = result && this.subscriptionGroupManager.load(); if (result) { try { // 初始化默认的DefaultMessageStore对象 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //加载插件,此处默认为空 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); } catch (IOException e) { result = false; e.printStackTrace(); } } // 1.加载延迟delayOffset.json文件 // --加载config\delayOffset.json文件,若文件为空则加载config\delayOffset.json.bak文件 // --设置默认等级参数(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 4h 6h 12h 1d 2d) // 2.加载提交commitlog目录日志 // --commitlog目录下所有日志信息并写,刷新,提交的。例如:00000000000000000000文件 // 3.加载consumequeue目录文件,消费队列 // --按照topic主题和设置的队列数来加载 // 4.设置checkpoint文件位置信息 // --index目录下的所有文件 // --如果rocketmq非正常退出,则根据当前启动Checkpoint的时间和最后一次时间比较,若最后一次时间大于当前时间则直接删除该文件,否则保留文件 // --然后加载到索引文件集合中 // --恢复消费队列,如果是正常退出,所有的内存数据都将被刷新进行数据恢复,若非正常退出,恢复正常的数据,错误的数据则丢失 // --恢复topic队列,设置commitLog对象topic消费记录。例如:{topic_1-1=250, topic_1-0=250, topic_1-3=249, topic_1-2=251} result = result && this.messageStore.load(); if (result) { // 设置netty远程通讯服务,端口为10909 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); // 发送消息自定义线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); // 拉取消息自定义线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); // 管理员Broker自定义线程池 this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); // 客户端管理自定义线程池 this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); // 消息管理自定义线程池 this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); // 注册消息处理器 this.registerProcessor(); // TODO remove in future final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Exception e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Exception e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask syncAll slave exception", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } else { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } } return result; }
初始化过程如下:
1.加载延迟delayOffset.json文件
--加载config\delayOffset.json文件,若文件
3ff0
为空则加载config\delayOffset.json.bak文件
--设置默认等级参数(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 4h 6h 12h 1d 2d)
2.加载提交commitlog目录日志
--commitlog目录下所有日志信息并写,刷新,提交的。例如:00000000000000000000文件
3.加载consumequeue目录文件,消费队列
--按照topic主题和设置的队列数来加载
4.设置checkpoint文件位置信息
--index目录下的所有文件
--如果rocketmq非正常退出,则根据当前启动Checkpoint的时间和最后一次时间比较,若最后一次时间大于当前时间则直接删除该文件,否则保留文件
--然后加载到索引文件集合中
--恢复消费队列,如果是正常退出,所有的内存数据都将被刷新进行数据恢复,若非正常退出,恢复正常的数据,错误的数据则丢失
--恢复topic队列,设置commitLog对象topic消费记录。例如:{topic_1-1=250, topic_1-0=250, topic_1-3=249, topic_1-2=251}
相关文章推荐
- RocketMQ runbroker.sh 分析JVM启动参数
- 查看RocketMQ的broker启动部分源码分析总结
- RocketMQ源码分析之Broker概述与同步消息发送原理与高可用设计及思考
- RocketMQ原理解析-producer 1.启动流程
- Rocketmq Broker启动网卡顺序问题
- RocketMQ原理解析-broker 1. broker的启动
- RocketMQ源码分析----Broker处理发送请求
- RocketMQ源码:Producer启动分析
- RocketMQ原理解析-broker 1. broker的启动
- 分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制
- u-boot总的启动流程代码分析
- [转载] linux启动流程分析(4)---汇编部分(4)
- [转载] linux启动流程分析(4)---汇编部分(1)
- spring源码之旅(2)_applicationcontext启动流程分析
- [转载] linux启动流程分析(4)---汇编部分(5)
- 14.3 U-Boot启动流程分析
- Linux系统分析之启动流程
- [转载] linux启动流程分析(2)---内核启动地址的确定
- [转载] linux启动流程分析(1)---bootloader启动内核过程
- Linux系统分析之启动流程