源码分析RocketMQ消息PULL-长轮询模式
2018-02-23 23:43
741 查看
消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker端可以通过longPollingEnable=true来开启长轮询。
短轮询:longPollingEnable=false,第一次未拉取到消息后等待shortPollingTimeMills时间后再试。shortPollingTimeMills默认为1S。
长轮询:longPollingEnable=true,会根据消费者端设置的挂起超时时间,受DefaultMQPullConsumer#brokerSuspendMaxTimeMillis,默认20s,(brokerSuspendMaxTimeMillis),长轮询有两个线程来相互实现。
1)PullRequestHoldService:每隔5S重试一次。
2)DefaultMessageStore#ReputMessageService,每当有消息到达后,转发消息,然后调用PullRequestHoldService线程中的拉取任务,尝试拉取,每处理一次,Thread.sleep(1),继续下一次检查。
1、RocketMQ拉轮询拉取任务创建
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest
首先看一下processRequest方法参数:
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
1)Channel channel:网络通道
2)RemotingCommand request:消息拉取请求
3)brokerAllowSuspend:是否允许挂起,也就是是否允许在未找到消息时暂时挂起线程。第一次调用时默认为true。
代码@1:hasSuspendFlag,构建消息拉取时的拉取标记,默认为true。
代码@2:suspendTimeoutMillisLong:取自DefaultMQPullConsumer的brokerSuspendMaxTimeMillis属性。
代码@3:如果不支持长轮询,则忽略brokerSuspendMaxTimeMillis属性,使用shortPollingTimeMills,默认为1000ms作为下一次拉取消息的等待时间。
代码@4:创建PullRequest,然后提交给PullRequestHoldService线程去调度,触发消息拉取。
代码@5:关键,设置response=null,则此时此次调用不会向客户端输出任何字节,客户端网络请求客户端的读事件不会触发,不会触发对响应结果的处理,处于等待状态。
在拉取消息时,如果拉取结果为PULL_NOT_FOUND,在服务端默认会挂起线程,然后根据是否启用长轮询机制,延迟一定时间后再次重试根据偏移量查找消息。
再研究轮询查找消息之前先看一下PullRequest对象的核心属性:
1)requestCommand:请求命令
2)clientChannel:网络连接,通过该通道向客户端返回响应结果。
3)timeoutMills:超时时长
4)suspendTimestamp:挂起开始时间戳,如果当前系统>=(timeoutMills+suspendTimestamp)表示已超时。
5)pullFromThisOffset:带拉取消息队列偏移量
6)subscriptionData:订阅信息。
7)messageFilter:消息过滤器
RocketMQ轮询机制由两个线程共同来完成:
1)PullRequestHoldService:每隔5S重试一次。
2)DefaultMessageStore#ReputMessageService,每处理一次重新拉取,Thread.sleep(1),继续下一次检查。
2、源码分析PullRequestHoldService线程
PullRequestHoldService详解:
2.1、PullRequestHoldService#suspendPullRequest
根据主题名称+队列id,获取ManyPullRequest,对于同一个topic+队列的拉取请求用ManyPullRequest包装,然后将pullRequest添加到ManyPullRequest中。
2.2、run方法详解
PullRequestHoldService#run
代码@1,如果开启了长轮询模式,则每次只挂起5S,然后就去尝试拉取。
代码@2,如果不开启长轮询模式,则只挂起一次,挂起时间为shortPollingTimeMills,然后去尝试查找消息。
代码@3,遍历pullRequestTable,如果拉取任务的待拉取偏移量小于当前队列的最大偏移量时执行拉取,否则如果没有超过最大等待时间则等待,否则返回未拉取到消息,返回给消息拉取客户端。具体请看:
PullRequestHoldService#checkHoldRequest
代码@1:根据主题,消费队列ID查找队列的最大偏移量。
代码@2:根据该offset,判断是否有新的消息达到。
PullRequestHoldService#notifyMessageArriving
代码@1:参数详解:
String topic:主题名称
int queueId:队列id
long maxOffset:消费队列当前最大偏移量
Long tagsCode:消息tag hashcode //基于tag消息过滤
long msgStoreTime:消息存储时间
byte[] filterBitMap:过滤位图
Map
这个方法值得注意一下,为什么addPullRequest方法会加锁呢?原因是ReputMessageService内部其实会持有PullRequestHoldService的引用,也就是在运行过程中,对于拉取任务,ReputMessageService、PullRequestHoldService处理的任务是同一个集合。
继续代码@5,重点跟进一下executeRequestWhenWakeup方法。
PullMessageProcessor#executeRequestWhenWakeup
这里的核心亮点是:在调用PullMessageProcessor.this.processRequest(channel, request, false)方法是,最后一个参数是false,表示broker端不支持挂起,这样在PullMessageProcessor方法中,如果没有查找消息,也不会继续再挂起了,因为进入这个方法时,拉取的偏移量是小于队列的最大偏移量,正常情况下是可以拉取到消息的。
3、源码分析DefaultMessageStore#ReputMessageService
ReputMessageService启动入口:
DefaultMessageStore#satart方法:
首先,设置reputFromOffset偏移量,如果允许重复,初始偏移量为confirmOffset,否则设置为commitLog当前最大偏移量。
3.1 run方法
从run方法可以看出ReputMessageService线程是一个“拼命三郎”,每执行完一次业务方法doReput,休息1毫秒,就继续抢占CPU,再次执行doReput方法。
3.2 doReput
代码@1:根据偏移量读取偏移量+到commitlog文件中有效数据的最大偏移量。如果未找到数据,结束doReput方法。
代码@2:循环从SelectMappedBufferResult中读取消息,每次读取一条。
代码@3:从SelectMappedBufferResult中读取一条消息,生成DispatchRequest对象。
代码@4:根据comitlog文件内容实时构建consumequeue、index文件的关键所在,该部分详情请参考:http://blog.csdn.net/prestigeding/article/details/79156276
代码@5:如果开启了长轮询并且角色为主节点,则通知有新消息到达,执行一次pullRequest验证。
NotifyMessageArrivingListener代码,最终调用pullRequestHoldService的notifyMessageArriving方法,进行一次消息拉取。
只要待拉取偏移量小于消息消费队列的最大偏移量,既可以被唤醒进行消息拉取。
rocketmq消息拉取长轮询机制就介绍到这里。
首先,要开启长轮询, 在broker配置文件中longPollingEnable=true,默认是开启的。
然后在在broker端根据偏移量去消息存储文件中查找消息时,如果未找到,会挂起线程,然后轮询查找消息。所谓的轮询是轮询待拉取消息偏移大于消息消费队列的最大偏移量时才挂起,一旦检测发现待拉取消息偏移量小于消费队列最大偏移量时,则尝试拉取消息,结束长轮询过程。
短轮询:longPollingEnable=false,第一次未拉取到消息后等待shortPollingTimeMills时间后再试。shortPollingTimeMills默认为1S。
长轮询:longPollingEnable=true,会根据消费者端设置的挂起超时时间,受DefaultMQPullConsumer#brokerSuspendMaxTimeMillis,默认20s,(brokerSuspendMaxTimeMillis),长轮询有两个线程来相互实现。
1)PullRequestHoldService:每隔5S重试一次。
2)DefaultMessageStore#ReputMessageService,每当有消息到达后,转发消息,然后调用PullRequestHoldService线程中的拉取任务,尝试拉取,每处理一次,Thread.sleep(1),继续下一次检查。
1、RocketMQ拉轮询拉取任务创建
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest
首先看一下processRequest方法参数:
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
1)Channel channel:网络通道
2)RemotingCommand request:消息拉取请求
3)brokerAllowSuspend:是否允许挂起,也就是是否允许在未找到消息时暂时挂起线程。第一次调用时默认为true。
case ResponseCode.PULL_NOT_FOUND: if (brokerAllowSuspend && hasSuspendFlag) { // @1 long pollingTimeMills = suspendTimeoutMillisLong; // @2 if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { // @3 pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); i 4000 nt queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); // @4 this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; // @5 break; }
代码@1:hasSuspendFlag,构建消息拉取时的拉取标记,默认为true。
代码@2:suspendTimeoutMillisLong:取自DefaultMQPullConsumer的brokerSuspendMaxTimeMillis属性。
代码@3:如果不支持长轮询,则忽略brokerSuspendMaxTimeMillis属性,使用shortPollingTimeMills,默认为1000ms作为下一次拉取消息的等待时间。
代码@4:创建PullRequest,然后提交给PullRequestHoldService线程去调度,触发消息拉取。
代码@5:关键,设置response=null,则此时此次调用不会向客户端输出任何字节,客户端网络请求客户端的读事件不会触发,不会触发对响应结果的处理,处于等待状态。
在拉取消息时,如果拉取结果为PULL_NOT_FOUND,在服务端默认会挂起线程,然后根据是否启用长轮询机制,延迟一定时间后再次重试根据偏移量查找消息。
再研究轮询查找消息之前先看一下PullRequest对象的核心属性:
1)requestCommand:请求命令
2)clientChannel:网络连接,通过该通道向客户端返回响应结果。
3)timeoutMills:超时时长
4)suspendTimestamp:挂起开始时间戳,如果当前系统>=(timeoutMills+suspendTimestamp)表示已超时。
5)pullFromThisOffset:带拉取消息队列偏移量
6)subscriptionData:订阅信息。
7)messageFilter:消息过滤器
RocketMQ轮询机制由两个线程共同来完成:
1)PullRequestHoldService:每隔5S重试一次。
2)DefaultMessageStore#ReputMessageService,每处理一次重新拉取,Thread.sleep(1),继续下一次检查。
2、源码分析PullRequestHoldService线程
PullRequestHoldService详解:
2.1、PullRequestHoldService#suspendPullRequest
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest(); ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); if (prev != null) { mpr = prev; } } mpr.addPullRequest(pullRequest); }
根据主题名称+队列id,获取ManyPullRequest,对于同一个topic+队列的拉取请求用ManyPullRequest包装,然后将pullRequest添加到ManyPullRequest中。
2.2、run方法详解
PullRequestHoldService#run
public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); // @1 } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); //@2 } long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); // @3 long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
代码@1,如果开启了长轮询模式,则每次只挂起5S,然后就去尝试拉取。
代码@2,如果不开启长轮询模式,则只挂起一次,挂起时间为shortPollingTimeMills,然后去尝试查找消息。
代码@3,遍历pullRequestTable,如果拉取任务的待拉取偏移量小于当前队列的最大偏移量时执行拉取,否则如果没有超过最大等待时间则等待,否则返回未拉取到消息,返回给消息拉取客户端。具体请看:
PullRequestHoldService#checkHoldRequest
private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); // @1 try { this.notifyMessageArriving(topic, queueId, offset); // @2 } catch (Throwable e) { log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); } } } }
代码@1:根据主题,消费队列ID查找队列的最大偏移量。
代码@2:根据该offset,判断是否有新的消息达到。
PullRequestHoldService#notifyMessageArriving
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { // @1 String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); // @2 if (requestList != null) { List<PullRequest> replayList = new ArrayList<PullRequest>(); for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { // @3 newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { // @4 boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); // match by bit map, need eval again when properties is not null. if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { // @5 try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); // @6 } } } }
代码@1:参数详解:
String topic:主题名称
int queueId:队列id
long maxOffset:消费队列当前最大偏移量
Long tagsCode:消息tag hashcode //基于tag消息过滤
long msgStoreTime:消息存储时间
byte[] filterBitMap:过滤位图
Map
public synchronized void addPullRequest(final List<PullRequest> many) { this.pullRequestList.addAll(many); }
这个方法值得注意一下,为什么addPullRequest方法会加锁呢?原因是ReputMessageService内部其实会持有PullRequestHoldService的引用,也就是在运行过程中,对于拉取任务,ReputMessageService、PullRequestHoldService处理的任务是同一个集合。
继续代码@5,重点跟进一下executeRequestWhenWakeup方法。
PullMessageProcessor#executeRequestWhenWakeup
public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable() { @Override public void run() { try { final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false); if (response != null) { response.setOpaque(request.getOpaque()); response.markResponseType(); try { channel.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { log.error("processRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause()); log.error(request.toString()); log.error(response.toString()); } } }); } catch (Throwable e) { log.error("processRequestWrapper process request over, but response failed", e); log.error(request.toString()); log.error(response.toString()); } } } catch (RemotingCommandException e1) { log.error("excuteRequestWhenWakeup run", e1); } } }; this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request)); }
这里的核心亮点是:在调用PullMessageProcessor.this.processRequest(channel, request, false)方法是,最后一个参数是false,表示broker端不支持挂起,这样在PullMessageProcessor方法中,如果没有查找消息,也不会继续再挂起了,因为进入这个方法时,拉取的偏移量是小于队列的最大偏移量,正常情况下是可以拉取到消息的。
3、源码分析DefaultMessageStore#ReputMessageService
ReputMessageService启动入口:
DefaultMessageStore#satart方法:
if (this.getMessageStoreConfig().isDuplicationEnable()) { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); } else { this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); } this.reputMessageService.start();
首先,设置reputFromOffset偏移量,如果允许重复,初始偏移量为confirmOffset,否则设置为commitLog当前最大偏移量。
3.1 run方法
public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
从run方法可以看出ReputMessageService线程是一个“拼命三郎”,每执行完一次业务方法doReput,休息1毫秒,就继续抢占CPU,再次执行doReput方法。
3.2 doReput
private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); // @1 if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { //@2 DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); //@3 int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { DefaultMessageStore.this.doDispatch(dispatchRequest); // @4 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { // @5 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } }
代码@1:根据偏移量读取偏移量+到commitlog文件中有效数据的最大偏移量。如果未找到数据,结束doReput方法。
代码@2:循环从SelectMappedBufferResult中读取消息,每次读取一条。
代码@3:从SelectMappedBufferResult中读取一条消息,生成DispatchRequest对象。
代码@4:根据comitlog文件内容实时构建consumequeue、index文件的关键所在,该部分详情请参考:http://blog.csdn.net/prestigeding/article/details/79156276
代码@5:如果开启了长轮询并且角色为主节点,则通知有新消息到达,执行一次pullRequest验证。
NotifyMessageArrivingListener代码,最终调用pullRequestHoldService的notifyMessageArriving方法,进行一次消息拉取。
只要待拉取偏移量小于消息消费队列的最大偏移量,既可以被唤醒进行消息拉取。
public class NotifyMessageArrivingListener implements MessageArrivingListener { private final PullRequestHoldService pullRequestHoldService; public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) { this.pullRequestHoldService = pullRequestHoldService; } @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); } }
rocketmq消息拉取长轮询机制就介绍到这里。
首先,要开启长轮询, 在broker配置文件中longPollingEnable=true,默认是开启的。
然后在在broker端根据偏移量去消息存储文件中查找消息时,如果未找到,会挂起线程,然后轮询查找消息。所谓的轮询是轮询待拉取消息偏移大于消息消费队列的最大偏移量时才挂起,一旦检测发现待拉取消息偏移量小于消费队列最大偏移量时,则尝试拉取消息,结束长轮询过程。
相关文章推荐
- 源码分析RocketMQ消息拉取拉模式PULL
- 源码分析RocketMQ消息过滤机制上篇-----消息消费服务端过滤与TAG模式过滤实现
- 源码分析RocketMQ消息过滤机制下篇-FilterServer、ClassFilter模式详解
- rocketmq源码分析3-consumer消息获取
- 阿里消息队列中间件 RocketMQ 源码分析 —— Message 拉取与消费(上)
- RocketMQ——Consumer篇:PULL模式下的消息消费(DefaultMQPullConsumer)
- 源码分析RocketMQ消息消费机制----消费者拉取消息机制
- 消息系统该Push/Pull模式分析
- rocketmq源码分析2-broker的消息接收
- 消息系统该Push/Pull模式分析
- RocketMQ源码分析----消息存储
- 源码分析RocketMQ顺序消息消费实现原理
- 分布式消息队列 RocketMQ 源码分析——高可用
- RocketMQ源码分析----消费消息
- RocketMQ源码:有序消息分析
- 消息系统该Push/Pull模式分析
- RocketMQ 源码分析 事务消息
- [转] 消息系统该Push/Pull模式分析
- 消息系统Push/Pull模式分析
- RocketMQ 源码分析 定时消息与消息重试