RocketMQ源码深度解析五之Consumer篇
2017-08-09 15:21
686 查看
前言:RocketMQ 提供了两种消费模式, PUSH 和 PULL,大多数场景使用的是PUSH模式,这两种模式分别对应的是 DefaultMQPushConsumer 类和DefaultMQPullConsumer 类。 PUSH 模式实际上在内部还是使用的 PULL 方式实现的,通过 PULL 不断地轮询 Broker 获取消息,当不存在新消息时, Broker 会挂起 PULL 请求,直到有新消息产生才取消挂起,返回新消息。故此处主要讲解PUSH模式, 即 DefaultMQPushConsumer。
Consumer主要用于向Broker请求Producer产生的消息,对其进行消费;对于RocketMQ,我们一定很好奇,如何实现分布式的Consumer消费,如何保证Consumer的顺序性,不重复性呢?
(一)启动consumer
初始化DefaultMQPushConsumer,并设置变量值:
1,用consumerGroup为参数初始化DefaultMQPushConsumer 对象,默认初始化MessageQueue分配策略AllocateMessageQueueAveragely 赋值给DefaultMQPushConsumer.allocateMessageQueueStrategy 变量;
2,设置NameServer值
3,保存应用层的topic-subExpression 值
4、调用 DefaultMQPushConsumer.subscribe方法用于构建 Consumer 端的订阅数据 SubscriptionData 对象。
5、设置 Conumser 线程的最大值,最小值,从何处开始消费等调用
6、DefaultMQPushConsumer.registerMessageListener(MessageListener messageListener)方法设置拉取消息后的回调类
启动 DefaultMQPushConsumer
调用 DefaultMQPushConsumer.start 方法中,直接调用DefaultMQPushConsumerImpl.start 方法,大致逻辑如下:
(二)RebalanceImpl的解读
该类是负载均衡的相关类,接下来就解读它的几个重要的方法
1,
2,
(三)Push模式下的消息消费
(1)拉取消息(pullMessage)
(四)顺序消费的逻辑
顺序消费存在三把锁:1) ConsumeMessageOrderlyService 类中的定时任务会每隔20秒钟执行一次lockMQPeriodically方法,获取该Consumer客户端在Broker端锁住的MessageQueue集合(分布式锁),并通过该集合获取对应的ProcessQueue(消息处理队列)加上本地锁和加锁时间,目的是为了消费时在本地检查消费队列是否被锁住。2)在进行消息队列的消费过程中,对MessageQueue对象进行本地同步锁,保证同一个时间只允许一个线程消息一个ConsumeQueue队列。3) 在回调业务层定义ConsumeMessageOrderlyService.messageListener:MessageListenerOrderly
类的 consumeMessage 方法之前获取 ProcessQueue.lockConsume:ReentrantLock变量的锁即消费处理队列的锁, 该锁的粒度比消息队列的同步锁粒度更小。
主要看下ConsumeMessageOrderlyService内部类ConsumeRequest.run方法
(五)并发消费的逻辑
主要涉及ConsumeMessageConcurrentlyService内部类ConsumeRequest.run方法。接下来看些些基本逻辑:
1,判断 proccessQueue 是否被 droped 的, 废弃直接返回,不在消费消息
2.构建并行消费上下文
3,给消息设置消费失败时候的 retry topic,当消息发送失败的时候发送到 topic为%RETRY%groupname 的队列中
4,调 MessageListenerConcurrently 监听器的 consumeMessage 方法消费消息,返回消费结果
5,如果 ProcessQueue 的 droped 为 true,不处理结果,不更新 offset, 但其实这里消费端是消费了消息的,这种情况感觉有被重复消费的风险
6,处理消费结果
6.1)消费成功, 对于批次消费消息,返回消费成功并不代表所有消息都消费成功,
6.2)但是消费消息的时候一旦遇到消费消息失败直接放回,根据 ackIndex 来标记
6.3)成功消费到哪里了
6.4)消费失败, ackIndex 设置为-1
6.5)广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集群性能会有较大影响,失败重试功能交由应用处理。集群模式, 将消费失败的消息一条条的发送到 broker 的重试队列中去,如果此时还有发送到重试队列发送失败的消息,那就在 cosumer 的本地线程定时 5秒钟以后重试重新消费消息, 在走一次上面的消费流程。
7.删除正在消费的队列 processQueue 中本次消费的消息,放回消费进度
8.更新消费进度, 这里的更新只是一个内存 offsetTable 的更新,后面有定时任务定时更新到 broker 上去
Consumer主要用于向Broker请求Producer产生的消息,对其进行消费;对于RocketMQ,我们一定很好奇,如何实现分布式的Consumer消费,如何保证Consumer的顺序性,不重复性呢?
(一)启动consumer
初始化DefaultMQPushConsumer,并设置变量值:
1,用consumerGroup为参数初始化DefaultMQPushConsumer 对象,默认初始化MessageQueue分配策略AllocateMessageQueueAveragely 赋值给DefaultMQPushConsumer.allocateMessageQueueStrategy 变量;
2,设置NameServer值
3,保存应用层的topic-subExpression 值
4、调用 DefaultMQPushConsumer.subscribe方法用于构建 Consumer 端的订阅数据 SubscriptionData 对象。
5、设置 Conumser 线程的最大值,最小值,从何处开始消费等调用
6、DefaultMQPushConsumer.registerMessageListener(MessageListener messageListener)方法设置拉取消息后的回调类
启动 DefaultMQPushConsumer
调用 DefaultMQPushConsumer.start 方法中,直接调用DefaultMQPushConsumerImpl.start 方法,大致逻辑如下:
public synchronized void start() throws MQClientException { //检查 DefaultMQPushConsumerImpl.ServiceState 的状态 switch (this.serviceState) { //只有状态为 CREATE_JUST 时才启动该 Producer; case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); //将 DefaultMQPushConsumerImpl.ServiceState 置为 start_failed,防止一个进程中重复启动 this.serviceState = ServiceState.START_FAILED; //检查参数是否正确 this.checkConfig(); //对 DefaultMQPushConsumer.subscription 变量进行的设置,要将此 Map 变量值转换为 SubscriptionData 对象; this.copySubscription(); //若消息模式为集群模式且实例名( instanceName)等于“ DEFAULT”,则 //取该进程的 PID 作为该 Consumer 的实例名( instanceName); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } //创建 MQClientInstance 对象,需要判断是否已经创建 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); //设置参数 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); //初始化 PullAPIWrapper 对象 this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); //注册 FilterMessageHook 列表,用于消息的过滤; this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); //若在应用层通过 DefaultMQPushConsumer.setOffsetStore if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { //若消息模式是广播( BROADCASTING),则初始化LocalFileOffsetStore 对象并赋值给 case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; //若消息模式是集群( CLUSTERING),则初始化 RemoteBrokerOffsetStore 对象并赋值给DefaultMQPushConsumerImpl.offsetStore 变量 case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } } //将本地的 offsets.json 文件内容加载到 LocalFileOffsetStore.offsetTable 变量中 this.offsetStore.load(); //若是顺序消费 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } /* ConsumeMessageConcurrentlyService 类的 start 方法没有实现任何业务逻辑; ConsumeMessageOrderlyService 的 start 方法的处理逻辑是当消息模式为 集群模式时,设置定时任务每隔 20 秒调用一次ConsumeMessageOrderlyService.lockMQPeriodically()方法, */ this.consumeMessageService.start(); //将 DefaultMQPushConsumerImpl 对象在 MQClientInstance 中注册 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); //说明在一个客户端的一个进程下面启动多个Consumer 时 consumerGroup 名字不能一样,否则无法启动; if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } //启动 MQClientInstance 对象 mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); //设置 DefaultMQPushConsumerImpl 的 ServiceState 为 RUNNING; this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, "// + this.serviceState// + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } //更新 topic 的配置信息; this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); /* 第一,向所有在 MQClientInstance.brokerAddrTable 列表中 的 Broker 发送心跳消息;第二,向 Filter 过滤服务器发送 REGISTER_MESSAGE_FILTER_CLASS 请求码,更新过滤服务器中的 Filterclass 文件; */ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
(二)RebalanceImpl的解读
该类是负载均衡的相关类,接下来就解读它的几个重要的方法
1,
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { //广播模式 case BROADCASTING: { //获取MessageQueue 集合; Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); //为true说明RebalanceImpl.processQueueTable 列表有变化 if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", // consumerGroup, // topic, // mqSet, // mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } //集群模式 case CLUSTERING: { //获取MessageQueue 集合; Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //获取以该 consumerGroup 为名的 ClientId 集合 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); //进行排序 Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { //为当前Consumer分配消费的MessageQueue allocateResult = strategy.allocate(// this.consumerGroup, // this.mQClientFactory.getClientId(), // mqAll, // cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
2,
rivate boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); //参数topic等于MessageQueue的topic值 if (mq.getTopic().equals(topic)) { //不在参数的集合中 if (!mqSet.contains(mq)) { //对应的 ProcessQueue 对象的 dropped 变量设置为true pq.setDropped(true); //如果删除消费进度成功 if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } } //如果距离上次拉取时间已经超时 else if (pq.isPullExpired()) { switch (this.consumeType()) { case CONSUME_ACTIVELY: break; //消费类型为被动消费:push模式 case CONSUME_PASSIVELY: //将该记录对应的 ProcessQueue 对象的dropped 变量置为 true pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; default: break; } } } } List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); //遍历集合 for (MessageQueue mq : mqSet) { //如果不在map中 if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); //获取该MessageQueue对象的下一个消费偏移值 long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { //加入map中国 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); //构造PullRequest后,加入pullRequestList中 PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } /* 目的:将 PullRequest 对象放入 PullMessageService 服务线程的 pullRequestQueue 队列中 */ this.dispatchPullRequest(pullRequestList); return changed; }
(三)Push模式下的消息消费
(1)拉取消息(pullMessage)
public void pullMessage(final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); //检查ProcessQueue 对象的 dropped 是否为 true if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } //更新时间戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); //检查该 Consumer 是否运行 try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); //3 秒之后再次将该 PullRequest 对象放入 PullMessageService.pullRequestQueue 队列中 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); return; } if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } //获取msgCount long size = processQueue.getMsgCount().get(); //如果大于消费端的流控阈值 if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) { //50ms后重新该PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((flowControlTimes1++ % 1000) == 0) { log.warn( "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1); } return; } //如果不是顺序消费 if (!this.consumeOrderly) { //ProcessQueue 对象的 msgTreeMap的第一个 key 值与最后一个 key 值之间的差额大于阀值 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { //50ms后重新该PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((flowControlTimes2++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, flowControlTimes2); } return; } } else { if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { //获取偏移量 final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } //更新状态 pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.info("pull message later because not locked in broker, {}", pullRequest); return; } } final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.warn("find the consumer's subscription failed, {}", pullRequest); return; } //省略匿名内部类PullCallback //接下来更新订阅关系 }
(四)顺序消费的逻辑
顺序消费存在三把锁:1) ConsumeMessageOrderlyService 类中的定时任务会每隔20秒钟执行一次lockMQPeriodically方法,获取该Consumer客户端在Broker端锁住的MessageQueue集合(分布式锁),并通过该集合获取对应的ProcessQueue(消息处理队列)加上本地锁和加锁时间,目的是为了消费时在本地检查消费队列是否被锁住。2)在进行消息队列的消费过程中,对MessageQueue对象进行本地同步锁,保证同一个时间只允许一个线程消息一个ConsumeQueue队列。3) 在回调业务层定义ConsumeMessageOrderlyService.messageListener:MessageListenerOrderly
类的 consumeMessage 方法之前获取 ProcessQueue.lockConsume:ReentrantLock变量的锁即消费处理队列的锁, 该锁的粒度比消息队列的同步锁粒度更小。
主要看下ConsumeMessageOrderlyService内部类ConsumeRequest.run方法
@Override public void run() { //检查 ProcessQueue.dropped 是否为 true,若是则直接返回; if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } //对 MessageQueue 对象加锁,保证同一时间只允许一个线程使用该MessageQueue 对象 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { //若消息模式是广播或者对应的 ProcessQueue.locked 等于 true 且锁的时间未过期 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } //若消息模式是集群并且 ProcessQueue.locked 不等于 true( 未锁住)或者锁住了但是锁已经超时 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } //获取一次批量消费的消息个数 batchSize final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); //takeMessages从ProcessQueue.msgTreeMap变量中获取 batchSize个数的 List<MessageExt>列表;并且从 msgTreeMap 中删除,存入临时变量 msgTreeMapTemp 中 List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); // init the consume context type consumeMessageContext.setProps(new HashMap<String, String>()); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { //防止在消费的过程中,被其他线程将此消费队列解锁了,从而引起并发消费的问题 this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } //保证同一个 ProcessQueue 在同一时间只能有一个线程调用 consumeMessage 方法 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // RemotingHelper.exceptionSimpleDesc(e), // ConsumeMessageOrderlyService.this.consumerGroup, // msgs, // messageQueue); hasException = true; } finally { //释放锁 this.processQueue.getLockConsume().unlock(); } if (null == status // || ConsumeOrderlyStatus.ROLLBACK == status// || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // ConsumeMessageOrderlyService.this.consumerGroup, // msgs, // messageQueue); } //对返回的state进行判断 long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageOrderlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); //处理回调方法 consumeMessage 的执行结果 continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
(五)并发消费的逻辑
主要涉及ConsumeMessageConcurrentlyService内部类ConsumeRequest.run方法。接下来看些些基本逻辑:
1,判断 proccessQueue 是否被 droped 的, 废弃直接返回,不在消费消息
2.构建并行消费上下文
3,给消息设置消费失败时候的 retry topic,当消息发送失败的时候发送到 topic为%RETRY%groupname 的队列中
4,调 MessageListenerConcurrently 监听器的 consumeMessage 方法消费消息,返回消费结果
5,如果 ProcessQueue 的 droped 为 true,不处理结果,不更新 offset, 但其实这里消费端是消费了消息的,这种情况感觉有被重复消费的风险
6,处理消费结果
6.1)消费成功, 对于批次消费消息,返回消费成功并不代表所有消息都消费成功,
6.2)但是消费消息的时候一旦遇到消费消息失败直接放回,根据 ackIndex 来标记
6.3)成功消费到哪里了
6.4)消费失败, ackIndex 设置为-1
6.5)广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集群性能会有较大影响,失败重试功能交由应用处理。集群模式, 将消费失败的消息一条条的发送到 broker 的重试队列中去,如果此时还有发送到重试队列发送失败的消息,那就在 cosumer 的本地线程定时 5秒钟以后重试重新消费消息, 在走一次上面的消费流程。
7.删除正在消费的队列 processQueue 中本次消费的消息,放回消费进度
8.更新消费进度, 这里的更新只是一个内存 offsetTable 的更新,后面有定时任务定时更新到 broker 上去
相关文章推荐
- 【RocketMQ源码深度解析】整体介绍&IDE编译并启动RocketMQ的第一个例子
- 【RocketMQ源码深度解析2】源码目录结构介绍&Remoting通信层
- RocketMQ源码解析-Consumer启动(2)
- RocketMQ原理解析-consumer 1.启动
- Kafka源码深度解析-序列6 -Consumer -消费策略分析
- 消息中间件 RocketMQ源码解析:高可用
- RocketMQ原理解析-consumer 3.长轮询
- Rocket MQ consumer 源码分析(绝对干货)
- RocketMq之Netty通讯源码解析
- RocketMQ原理解析-consumer 7.shutdown
- RocketMQ原理解析-consumer 5.push消费-顺序消费消息
- RocketMQ原理解析-consumer 2.消费端负载均衡
- 消息中间件 RocketMQ源码解析:Message存储
- RocketMQ4.1.0源码解析--Consumer的启动过程
- RocketMQ原理解析-consumer 3.长轮询
- RocketMQ原理解析-consumer 7.shutdown
- RocketMQ源码解析:Message拉取&消费(上)
- 消息中间件 RocketMQ源码解析:事务消息
- 消息中间件 RocketMQ源码解析:Message拉取&消费(下)
- RocketMQ源码解析-PullConsumer取消息(1)