您的位置:首页 > 其它

rocketmq 4.0 Broker启动流程分析

2017-06-17 00:00 701 查看
摘要: BrokerController创建与初始化

在一次网络故障中引发的消息重发而导致深度研究源码的瞎想!!!回想当时整个过程是这样的:项目部署的主从,当天下午主网络问题,然后停掉主和从,将从改为主,启动后未发生任何重发消息;我当时建议把以前的主改为从启动起来,运维人员说以前主配置内存大些,然后就停掉了现在的一主,将其又改为从,以前的那个作为主又启动起来,后续问题出现了,结果大量消息重新消费;再由之有些业务并未做去重处理,显然整个数据就紊乱了。

注:消息服务做了补偿机制,即使宕机也不会影响业务端流程正常运行和数据丢失现象。

我查看了很久,没有看出任何问题,检查了所有日志输出信息也未发现问题,后来的推断应该是consumerOffset.json文件或者其他文件消费索引文件给重置了,然后把消息重新尝试发了一遍,建立消费索引。废话少说,以下我们来分析broker源码启动流程,来寻找真正的原因。

配置加载和BrokerController创建

public static BrokerController createBrokerController(String[] args) {
// 设置rocketmq.remoting.version变量到系统
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

// 检查netty远程通讯缓冲大小配置并初始化
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 131072;
}

if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}

try {
// 创建命令行参数:如help, mqbroker等命令,设置namesrvAddr-->n, h-->help等后续会用到
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}

// 初始化Broker配置
final BrokerConfig brokerConfig = new BrokerConfig();
// 初始化netty客户端和服务器配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(10911);
// 初始化消息存储位置和rocket所有启动broker的加载的properties配置信息
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

// 默认初始化是为异步复制
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}

if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(null, brokerConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
MixAll.printObjectProperties(null, nettyClientConfig);
MixAll.printObjectProperties(null, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
MixAll.printObjectPropert
3ff0
ies(null, brokerConfig, true);
MixAll.printObjectProperties(null, nettyServerConfig, true);
MixAll.printObjectProperties(null, nettyClientConfig, true);
MixAll.printObjectProperties(null, messageStoreConfig, true);
System.exit(0);
}

// 启动时设置了 -c /conf/2m-2s-sync/broker-a.properties文件加载目录
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
// broker-a.properties文件
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
// 加载文件到属性集合中
properties = new Properties();
properties.load(in);

// 设置配置对象值
parsePropertie2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);

BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}

// 将nameSrv和file路径设置到BrokerConfig对象中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

//brokerConfig.setRocketmqHome("H:/open_source_project/incubator-rocketmq");
// rocketmqHome为空则退出程序
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ " variable in your environment to match the location of the RocketMQ installation");
System.exit(-2);
}

String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}

// 设置主从BrokerId参数
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}

break;
default:
break;
}

// 设置HA监听端口10912
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
// 设置日志工厂对象,主要用于Logger实例化
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

//
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);

// 创建并初始化BrokerController对象
final BrokerController controller = new BrokerController(//
brokerConfig, //
nettyServerConfig, //
nettyClientConfig, //
messageStoreConfig);
// 合并当前所有配置信息防止后续丢失
controller.getConfiguration().registerConfig(properties);

// 初始化加载本地store等系统索引,消费等文件
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);

@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long begineTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - begineTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));

return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}


BrokerController初始化和文件恢复加载等

public boolean initialize() throws CloneNotSupportedException {
boolean result = true;
// 加载config\topics.json文件,若文件为空则加载config\topics.json.bak文件
result = result && this.topicConfigManager.load();
// 加载config\consumerOffset.json文件,若文件为空则加载config\consumerOffset.json.bak文件
result = result && this.consumerOffsetManager.load();
// 加载config\subscriptionGroup.json文件,若文件为空则加载config\subscriptionGroup.json.bak文件
result = result && this.subscriptionGroupManager.load();

if (result) {
try {
// 初始化默认的DefaultMessageStore对象
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//加载插件,此处默认为空
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
} catch (IOException e) {
result = false;
e.printStackTrace();
}
}

//      1.加载延迟delayOffset.json文件
//        --加载config\delayOffset.json文件,若文件为空则加载config\delayOffset.json.bak文件
//        --设置默认等级参数(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 4h 6h 12h 1d 2d)
//      2.加载提交commitlog目录日志
//        --commitlog目录下所有日志信息并写,刷新,提交的。例如:00000000000000000000文件
//      3.加载consumequeue目录文件,消费队列
//        --按照topic主题和设置的队列数来加载
//      4.设置checkpoint文件位置信息
//        --index目录下的所有文件
//        --如果rocketmq非正常退出,则根据当前启动Checkpoint的时间和最后一次时间比较,若最后一次时间大于当前时间则直接删除该文件,否则保留文件
//        --然后加载到索引文件集合中
//        --恢复消费队列,如果是正常退出,所有的内存数据都将被刷新进行数据恢复,若非正常退出,恢复正常的数据,错误的数据则丢失
//        --恢复topic队列,设置commitLog对象topic消费记录。例如:{topic_1-1=250, topic_1-0=250, topic_1-3=249, topic_1-2=251}
result = result && this.messageStore.load();

if (result) {
// 设置netty远程通讯服务,端口为10909
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
// 发送消息自定义线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
// 拉取消息自定义线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
// 管理员Broker自定义线程池
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
// 客户端管理自定义线程池
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
// 消息管理自定义线程池
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
// 注册消息处理器
this.registerProcessor();

// TODO remove in future
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Exception e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Exception e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}

return result;
}

初始化过程如下:

1.加载延迟delayOffset.json文件
--加载config\delayOffset.json文件,若文件
3ff0
为空则加载config\delayOffset.json.bak文件
--设置默认等级参数(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 4h 6h 12h 1d 2d)
2.加载提交commitlog目录日志
--commitlog目录下所有日志信息并写,刷新,提交的。例如:00000000000000000000文件
3.加载consumequeue目录文件,消费队列
--按照topic主题和设置的队列数来加载
4.设置checkpoint文件位置信息
--index目录下的所有文件
--如果rocketmq非正常退出,则根据当前启动Checkpoint的时间和最后一次时间比较,若最后一次时间大于当前时间则直接删除该文件,否则保留文件
--然后加载到索引文件集合中
--恢复消费队列,如果是正常退出,所有的内存数据都将被刷新进行数据恢复,若非正常退出,恢复正常的数据,错误的数据则丢失
--恢复topic队列,设置commitLog对象topic消费记录。例如:{topic_1-1=250, topic_1-0=250, topic_1-3=249, topic_1-2=251}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RocketMQ