RocketMQ源码分析之Broker概述与同步消息发送原理与高可用设计及思考
2017-07-22 18:58
1201 查看
1、Broker概述
Broker在RocketMQ架构中的角色,就是存储消息,核心任务就是持久化消息,生产者发送消息给Broker,消费者从Broker消费消息。
以上摘录自官方RocketMQ设计文档
上述基本描述了消息中间件的架构设计,不仅限于RocketMQ,不同消息中间件的最大区别之一在消息的存储上。
2、Broker存储设计概要
接下来从配置文件的角度来窥探Broker存储设计的关注点:
代码来源于org.apache.rocketmq.store.config.MessageStoreConfig.MessageStoreConfig
1、storePathRootDir 设置Broker的存储根目录,默认为 $Broker_Home/store
2、storePathCommitLog 设置commitlog的存储目录,默认为$Broker_Home/store/commitlog
3、mapedFileSizeCommitLog commitlog文件的大小,默认为1G
4、mapedFileSizeConsumeQueue consumeQueueSize,ConsumeQueue存放的是定长的信息(20个字节,偏移量、size、tagscode),默认30w
* ConsumeQueue.CQ_STORE_UNIT_SIZE。
5、enableConsumeQueueExt 是否开启consumeQueueExt,默认为false,就是如果消费端消息消费速度跟不上,是否创建一个扩展的ConsumeQueue文件,如果不开启,应该会阻塞从commitlog文件中获取消息,并且ConsumeQueue,应该是按topic独立的。
6、mappedFileSizeConsumeQueueExt,扩展consume文件的大小,默认为48M
7、bitMapLengthConsumeQueueExt 待定
8、flushIntervalCommitLog 刷写CommitLog的间隔时间,RocketMQ后台会启动一个线程,将消息刷写到磁
盘,这个也就是该线程每次运行后等待的时间,默认为500毫秒。flush操作,调用文件通道的force()方法
9、commitIntervalCommitLog 提交消息到CommitLog对应的文件通道的间隔时间,原理与上面类似;将消息写入到文件通道(调用FileChannel.write方法)得到最新的写指针,默认为200毫秒
10、useReentrantLockWhenPutMessage 在put message( 将消息按格式封装成msg放入相关队列时实用的锁机制:自旋或ReentrantLock)
11、flushIntervalConsumeQueue 刷写到ConsumeQueue的间隔,默认为1s
12、flushCommitLogLeastPages 每次flush commitlog时最小发生变化的页数,如果不足该值,本次不进行刷写操作
13、commitCommitLogLeastPages 同上
14、flushLeastPagesWhenWarmMapedFile 同上
15、flushConsumeQueueLeastPages 同上
16、putMsgIndexHightWater 流量控制参数
本次重点关注上述参数,该参数基本控制了生产者--》Broker ---> 消费者相关机制。
接下来从如下方面去深入其实现:
1)生产者发送消息
2)消息协议(格式)
3)消息存储、检索
4)消费队列维护
5)消息消费、重试等机制
2.1 消息发送
2.1.1 消息发送参数详解:
1)Message msg
2)communicationMode communicationMode
发送方式,SYNC(同步)、ASYNC(异步)、ONEWAY(单向,不关注返回)
3)发送回调类(发送成功或发送失败)
4)timeout 消息发送超时时间
2.2.2 消息发送流程
默认消息发送实现:
1)获取topic的发布信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
2)循环 start
3)根据topic负载均衡算法选择一个MessageQueue
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
4)向MessageQueue发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
5)更新失败策略
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
6)如果是同步调用方式(SYNC),则执行失败重试,否则直接结束
7)循环end
2.2.2.1 获取topic的路由信息
首先我们来思考一下,topic的路由信息包含哪些内容
消息的发布与订阅基于topic,路由发布信息以topic维度进行描述
Broker负载消息存储,一个topic可以分布在多台Broker上(负载均衡),每个Broker包含多个Queue。队列元数据基于一个Broker来描述(QueueData:所在BrokerName,读队列个数,写队列个数、权限、同步或异步)
接下来先源码分析tryToFindTopicPublishInfo:获取该Topic的路由信息,基于该信息发送到具体的Broker的MessageQueue上。
源码分析 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl tryToFindTopicPublishInfo
代码@1,从本地缓存(ConcurrentMap<String/* topic */, TopicPublishInfo>)中尝试获取,第一次肯定为空,走代码@2的流程
接下来的落点都是通过updateTopicRouteInfoFromNameServer方法去NameServer获取配置信息并更新本地缓存配置
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)
通过上面的实现,首先默认是使用topic去查找配置,如果尝试找不到,则再使用默认的topic去找路由配置信息。
代码@2,@3,@4的关键落点在public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)
,接下来我们重点关注updateTopicRouteInfoFromNameServer方法
org.apache.rocketmq.client.impl.factory.MQClientInstance updateTopicRouteInfoFromNameServer源码分析:
代码@1,为了避免重复从NameServer获取配置信息,在这里使用了ReentrantLock,并且设有超时时间。固定为3000s。
代码@2,@3的区别,一个是获取默认topic的配置信息,一个是获取指定topic的配置信息,该方法在这里就不跟踪进去了,具体的实现就是通过与NameServer的长连接Channel发送GET_ROUTEINTO_BY_TOPIC(105)命令,获取配置信息。注意,次过程的超时时间为3s,由此可见,NameServer的实现要求高效。
代码@4、@5、@6,从这里开始,拿到最新的topic发布信息后,需要与本地缓存中的topic发布信息进行比较,如果有变化,则需要同步更新发送者、消费者关于该topic的缓存。
代码@7 更新发送者的缓存
代码@8,更新订阅者的缓存(消费队列信息)
至此tryToFindTopicPublishInfo运行完毕,从NameServer获取TopicPublishData,继续消息发送的第二个步骤,选取一个消息队列。
2.2.2.2 获取MessageQueue
核心源码:DefaultMQProducerImpl.sendDefaultImpl
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
代码@2-start--end,这里使用了本地线程变量ThreadLocal保存上一次发送的消息队列下标,消息发送使用轮询机制获取下一个发送消息队列。
看到没,代码@2对topic所有的消息队列进行一次,为什么要循环呢?因为加入了发送异常延迟,要确保选中的消息队列(MessageQueue)所在的Broker是正常的。
代码@3,判断当前的消息队列是否可用。
要理解代码@2,@3 处的逻辑,我们就需要理解RocketMQ 发送消息延迟机制,具体实现类:org.apache.rocketmq.client.latency.MQFaultStrategy
计算出来的延迟值+加上本次消息的延迟值,设置为FaultItem的startTimestamp,表示当前时间必须大于该startTimestamp时,该broker才重新参与MessageQueue的选择。
从@2--@3,一旦一个MessageQueue符合条件,即刻返回,但该Topic所在的所有Broker全部标记不可用时,进入到下一步逻辑处理。(在此处,我们要知道,标记为不可用,并不代表真的不可用,Broker是可用在故障期间被运营管理人员进行恢复的,比如重启)
代码@4,5,就是根据Broker的startTimestart进行一个排序,值越小,排前面,然后再选择一个,返回(此时不能保证一定可用,会抛出异常,如果消息发送方式是同步调用,则有重试机制)
接下来将进入到消息发送的第三步,发现消息。
2.2.2.3 根据MessageQueue向特定的Broker发送消息,源码:sendKernelImpl
本文不深入研究该方法,此刻理解为通过Product与Broker的长连接将消息发送给Broker,然后Broker将消息存储,并返回生产者。
值得注意的是,如果消息发送模式为(SYNC)同步调用时,在生产者实现这边默认提供重试机制,通过(retryTimesWhenSendFailed)参数设置,默认为2,表示重试2次,也就时最多运行3次。
上文主要分析了RocketMQ已同步方式发送消息的过程,异步模式与单向模式实现原理基本一样,异步只是增加了发送成功或失败的回掉方法。
思考题:
1、消息发送时时异常处理思路
1)NameServer挂了
2)Broker挂了
1、消息发送者在同一时刻持有NameServer集群中的一个连接用来及时获取broker等信息(topic路由信息),每一个Topic的队列分散在不同的Broker上,默认topic在Broker中对应4个发送队列,4个消息队列。
消息发送图解:
1、NameServer 挂机
在发送消息阶段,如果生产者本地缓存中没有缓存topic的路由信息,则需要从NameServer获取,只有当所有NameServer都不可用时,此时会抛MQClientException。如果所有的NameServer全部挂掉,并且生产者有缓存Topic的路由信息,此时依然可用发送消息。所以,NameServer的挂掉,通常不会对整个消息发送带来什么严重的问题。
2、Broker挂机
基础知识:消息生产者每隔30s,从NameServer处获取最新的Broker存活信息(topic路由信息)
Broker每30s,向所有的NameServer报告自己的情况,故Broker的down机,Product的最大可感知时间为60s,在这60s,消息发送会有什么影响呢?
此时分两种情况分别进行分析
1)启用sendLatencyFaultEnable
由于使用了故障延迟机制,详细原理见上文详解,会对获取的MQ进行可用性验证,比如获取一个MessageQueue,发送失败,这时会对该Broker进行标记,标记该Broker在未来的某段时间内不会被选择到,默认为(5分钟,不可改变),所有此时只有当该topic全部的broker挂掉,才无法发送消息,符合高可用设计。
2)不启用sendLatencyFaultEnable = false
此时会出现消息发送失败的情况,因为默认情况下,product每次发送消息,会采取轮询机制取下一个MessageQueue,由于可能该Message所在的Broker挂掉,会抛出异常。因为一个Broker默认为一个topic分配4个messageQueue,由于默认只重试2次,故消息有可能发送成功,有可能发送失败。
Broker在RocketMQ架构中的角色,就是存储消息,核心任务就是持久化消息,生产者发送消息给Broker,消费者从Broker消费消息。
以上摘录自官方RocketMQ设计文档
上述基本描述了消息中间件的架构设计,不仅限于RocketMQ,不同消息中间件的最大区别之一在消息的存储上。
2、Broker存储设计概要
接下来从配置文件的角度来窥探Broker存储设计的关注点:
代码来源于org.apache.rocketmq.store.config.MessageStoreConfig.MessageStoreConfig
1、storePathRootDir 设置Broker的存储根目录,默认为 $Broker_Home/store
2、storePathCommitLog 设置commitlog的存储目录,默认为$Broker_Home/store/commitlog
3、mapedFileSizeCommitLog commitlog文件的大小,默认为1G
4、mapedFileSizeConsumeQueue consumeQueueSize,ConsumeQueue存放的是定长的信息(20个字节,偏移量、size、tagscode),默认30w
* ConsumeQueue.CQ_STORE_UNIT_SIZE。
5、enableConsumeQueueExt 是否开启consumeQueueExt,默认为false,就是如果消费端消息消费速度跟不上,是否创建一个扩展的ConsumeQueue文件,如果不开启,应该会阻塞从commitlog文件中获取消息,并且ConsumeQueue,应该是按topic独立的。
6、mappedFileSizeConsumeQueueExt,扩展consume文件的大小,默认为48M
7、bitMapLengthConsumeQueueExt 待定
8、flushIntervalCommitLog 刷写CommitLog的间隔时间,RocketMQ后台会启动一个线程,将消息刷写到磁
盘,这个也就是该线程每次运行后等待的时间,默认为500毫秒。flush操作,调用文件通道的force()方法
9、commitIntervalCommitLog 提交消息到CommitLog对应的文件通道的间隔时间,原理与上面类似;将消息写入到文件通道(调用FileChannel.write方法)得到最新的写指针,默认为200毫秒
10、useReentrantLockWhenPutMessage 在put message( 将消息按格式封装成msg放入相关队列时实用的锁机制:自旋或ReentrantLock)
11、flushIntervalConsumeQueue 刷写到ConsumeQueue的间隔,默认为1s
12、flushCommitLogLeastPages 每次flush commitlog时最小发生变化的页数,如果不足该值,本次不进行刷写操作
13、commitCommitLogLeastPages 同上
14、flushLeastPagesWhenWarmMapedFile 同上
15、flushConsumeQueueLeastPages 同上
16、putMsgIndexHightWater 流量控制参数
本次重点关注上述参数,该参数基本控制了生产者--》Broker ---> 消费者相关机制。
接下来从如下方面去深入其实现:
1)生产者发送消息
2)消息协议(格式)
3)消息存储、检索
4)消费队列维护
5)消息消费、重试等机制
2.1 消息发送
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl sendDefaultImpl方法源码分析 rprivate SendResult sendDefaultImpl(// Message msg, // final CommunicationMode communicationMode, // final SendCallback sendCallback, // final long timeout// ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { }
2.1.1 消息发送参数详解:
1)Message msg
2)communicationMode communicationMode
发送方式,SYNC(同步)、ASYNC(异步)、ONEWAY(单向,不关注返回)
3)发送回调类(发送成功或发送失败)
4)timeout 消息发送超时时间
2.2.2 消息发送流程
默认消息发送实现:
private SendResult sendDefaultImpl(// Message msg, // final CommunicationMode communicationMode, // final SendCallback sendCallback, // final long timeout// ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (tmpmq != null) { mq = tmpmq; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); } throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
1)获取topic的发布信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
2)循环 start
3)根据topic负载均衡算法选择一个MessageQueue
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
4)向MessageQueue发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
5)更新失败策略
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
6)如果是同步调用方式(SYNC),则执行失败重试,否则直接结束
7)循环end
2.2.2.1 获取topic的路由信息
首先我们来思考一下,topic的路由信息包含哪些内容
消息的发布与订阅基于topic,路由发布信息以topic维度进行描述
Broker负载消息存储,一个topic可以分布在多台Broker上(负载均衡),每个Broker包含多个Queue。队列元数据基于一个Broker来描述(QueueData:所在BrokerName,读队列个数,写队列个数、权限、同步或异步)
接下来先源码分析tryToFindTopicPublishInfo:获取该Topic的路由信息,基于该信息发送到具体的Broker的MessageQueue上。
源码分析 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); // @1 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); // @2 topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { //@3 return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); //@4 topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
代码@1,从本地缓存(ConcurrentMap<String/* topic */, TopicPublishInfo>)中尝试获取,第一次肯定为空,走代码@2的流程
接下来的落点都是通过updateTopicRouteInfoFromNameServer方法去NameServer获取配置信息并更新本地缓存配置
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)
通过上面的实现,首先默认是使用topic去查找配置,如果尝试找不到,则再使用默认的topic去找路由配置信息。
代码@2,@3,@4的关键落点在public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)
,接下来我们重点关注updateTopicRouteInfoFromNameServer方法
org.apache.rocketmq.client.impl.factory.MQClientInstance updateTopicRouteInfoFromNameServer源码分析:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { // @1 try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { //@2 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); //@3 } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); //@4 boolean changed = topicRouteDataIsChange(old, topicRouteData); //@5 if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); //@6 } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } if (changed) { //@7 TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info //@8 { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info //@9 { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } } catch (Exception e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; }
代码@1,为了避免重复从NameServer获取配置信息,在这里使用了ReentrantLock,并且设有超时时间。固定为3000s。
代码@2,@3的区别,一个是获取默认topic的配置信息,一个是获取指定topic的配置信息,该方法在这里就不跟踪进去了,具体的实现就是通过与NameServer的长连接Channel发送GET_ROUTEINTO_BY_TOPIC(105)命令,获取配置信息。注意,次过程的超时时间为3s,由此可见,NameServer的实现要求高效。
代码@4、@5、@6,从这里开始,拿到最新的topic发布信息后,需要与本地缓存中的topic发布信息进行比较,如果有变化,则需要同步更新发送者、消费者关于该topic的缓存。
代码@7 更新发送者的缓存
代码@8,更新订阅者的缓存(消费队列信息)
至此tryToFindTopicPublishInfo运行完毕,从NameServer获取TopicPublishData,继续消息发送的第二个步骤,选取一个消息队列。
2.2.2.2 获取MessageQueue
核心源码:DefaultMQProducerImpl.sendDefaultImpl
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { // @1 try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); //@2 start for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //@2 end if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { //@3 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); //@4 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); //@5 start if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); //@5 end } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); //@6 }@1、sendLatencyFaultEnable,是否开启消息失败延迟,改值在消息发送者那里可以设置,如果该值为false,直接从topic的所有队列中选择下一个,而不考虑该消息队列是否可用(比如Broker挂掉)
代码@2-start--end,这里使用了本地线程变量ThreadLocal保存上一次发送的消息队列下标,消息发送使用轮询机制获取下一个发送消息队列。
看到没,代码@2对topic所有的消息队列进行一次,为什么要循环呢?因为加入了发送异常延迟,要确保选中的消息队列(MessageQueue)所在的Broker是正常的。
代码@3,判断当前的消息队列是否可用。
要理解代码@2,@3 处的逻辑,我们就需要理解RocketMQ 发送消息延迟机制,具体实现类:org.apache.rocketmq.client.latency.MQFaultStrategy
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }latencyMax,最大延迟时间数值,再消息发送之前,先记录当前时间(start),然后消息发送成功或失败时记录当前时间(end),(end-start)代表一次消息延迟时间,发送错误时,updateFaultItem中isolation为真,与latencyMax中值进行比较时得值为30s,也就时该broker在接下来得600000L,也就时5分钟内不提供服务,等待该Broker的恢复。
计算出来的延迟值+加上本次消息的延迟值,设置为FaultItem的startTimestamp,表示当前时间必须大于该startTimestamp时,该broker才重新参与MessageQueue的选择。
从@2--@3,一旦一个MessageQueue符合条件,即刻返回,但该Topic所在的所有Broker全部标记不可用时,进入到下一步逻辑处理。(在此处,我们要知道,标记为不可用,并不代表真的不可用,Broker是可用在故障期间被运营管理人员进行恢复的,比如重启)
代码@4,5,就是根据Broker的startTimestart进行一个排序,值越小,排前面,然后再选择一个,返回(此时不能保证一定可用,会抛出异常,如果消息发送方式是同步调用,则有重试机制)
接下来将进入到消息发送的第三步,发现消息。
2.2.2.3 根据MessageQueue向特定的Broker发送消息,源码:sendKernelImpl
本文不深入研究该方法,此刻理解为通过Product与Broker的长连接将消息发送给Broker,然后Broker将消息存储,并返回生产者。
值得注意的是,如果消息发送模式为(SYNC)同步调用时,在生产者实现这边默认提供重试机制,通过(retryTimesWhenSendFailed)参数设置,默认为2,表示重试2次,也就时最多运行3次。
上文主要分析了RocketMQ已同步方式发送消息的过程,异步模式与单向模式实现原理基本一样,异步只是增加了发送成功或失败的回掉方法。
思考题:
1、消息发送时时异常处理思路
1)NameServer挂了
2)Broker挂了
1、消息发送者在同一时刻持有NameServer集群中的一个连接用来及时获取broker等信息(topic路由信息),每一个Topic的队列分散在不同的Broker上,默认topic在Broker中对应4个发送队列,4个消息队列。
消息发送图解:
1、NameServer 挂机
在发送消息阶段,如果生产者本地缓存中没有缓存topic的路由信息,则需要从NameServer获取,只有当所有NameServer都不可用时,此时会抛MQClientException。如果所有的NameServer全部挂掉,并且生产者有缓存Topic的路由信息,此时依然可用发送消息。所以,NameServer的挂掉,通常不会对整个消息发送带来什么严重的问题。
2、Broker挂机
基础知识:消息生产者每隔30s,从NameServer处获取最新的Broker存活信息(topic路由信息)
Broker每30s,向所有的NameServer报告自己的情况,故Broker的down机,Product的最大可感知时间为60s,在这60s,消息发送会有什么影响呢?
此时分两种情况分别进行分析
1)启用sendLatencyFaultEnable
由于使用了故障延迟机制,详细原理见上文详解,会对获取的MQ进行可用性验证,比如获取一个MessageQueue,发送失败,这时会对该Broker进行标记,标记该Broker在未来的某段时间内不会被选择到,默认为(5分钟,不可改变),所有此时只有当该topic全部的broker挂掉,才无法发送消息,符合高可用设计。
2)不启用sendLatencyFaultEnable = false
此时会出现消息发送失败的情况,因为默认情况下,product每次发送消息,会采取轮询机制取下一个MessageQueue,由于可能该Message所在的Broker挂掉,会抛出异常。因为一个Broker默认为一个topic分配4个messageQueue,由于默认只重试2次,故消息有可能发送成功,有可能发送失败。
相关文章推荐
- 分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制
- RocketMQ源码分析----Broker处理发送请求
- RocketMQ原理解析-producer 4.发送分布式事物消息
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- 分布式消息队列RocketMQ源码分析之4 -- Consumer负载均衡与Kafka的Consumer负载均衡之不同点
- RocketMQ原理解析-producer 3.如何发送顺序消息
- RocketMQ原理解析-broker 2.消息存储
- RocketMQ原理解析-broker 2.消息存储
- 消息中间件 RocketMQ源码解析:Message顺序发送与消费
- RocketMQ原理解析-producer 3.如何发送顺序消息
- RocketMQ原理解析-producer 5.消息在broker落地之普通消息
- RocketMQ原理解析-producer 5.消息在broker落地之普通消息
- RocketMQ原理解析-producer 4.发送分布式事物消息
- RocketMQ原理解析-producer 6.消息在broker落地之事物消息
- RocketMQ源码:有序消息分析
- 查看RocketMQ的broker启动部分源码分析总结
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId
- 消息中间件 RocketMQ源码解析:Message发送&接收
- 源码分析RocketMQ之CommitLog消息存储机制
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId