您的位置:首页 > 其它

RocketMQ源码深度解析五之Consumer篇

2017-08-09 15:21 686 查看
前言:RocketMQ 提供了两种消费模式, PUSH 和 PULL,大多数场景使用的是PUSH模式,这两种模式分别对应的是 DefaultMQPushConsumer 类和DefaultMQPullConsumer 类。 PUSH 模式实际上在内部还是使用的 PULL 方式实现的,通过 PULL 不断地轮询 Broker 获取消息,当不存在新消息时, Broker 会挂起 PULL 请求,直到有新消息产生才取消挂起,返回新消息。故此处主要讲解PUSH模式, 即 DefaultMQPushConsumer。

Consumer主要用于向Broker请求Producer产生的消息,对其进行消费;对于RocketMQ,我们一定很好奇,如何实现分布式的Consumer消费,如何保证Consumer的顺序性,不重复性呢?

(一)启动consumer

初始化DefaultMQPushConsumer,并设置变量值:

1,用consumerGroup为参数初始化DefaultMQPushConsumer 对象,默认初始化MessageQueue分配策略AllocateMessageQueueAveragely 赋值给DefaultMQPushConsumer.allocateMessageQueueStrategy 变量;

2,设置NameServer值

3,保存应用层的topic-subExpression 值

4、调用 DefaultMQPushConsumer.subscribe方法用于构建 Consumer 端的订阅数据 SubscriptionData 对象。

5、设置 Conumser 线程的最大值,最小值,从何处开始消费等调用

6、DefaultMQPushConsumer.registerMessageListener(MessageListener messageListener)方法设置拉取消息后的回调类

启动 DefaultMQPushConsumer

调用 DefaultMQPushConsumer.start 方法中,直接调用DefaultMQPushConsumerImpl.start 方法,大致逻辑如下:

public synchronized void start() throws MQClientException {
//检查 DefaultMQPushConsumerImpl.ServiceState 的状态
switch (this.serviceState) {
//只有状态为 CREATE_JUST 时才启动该 Producer;
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
//将 DefaultMQPushConsumerImpl.ServiceState 置为 start_failed,防止一个进程中重复启动
this.serviceState = ServiceState.START_FAILED;
//检查参数是否正确
this.checkConfig();
//对 DefaultMQPushConsumer.subscription 变量进行的设置,要将此 Map 变量值转换为 SubscriptionData 对象;
this.copySubscription();
//若消息模式为集群模式且实例名( instanceName)等于“ DEFAULT”,则
//取该进程的 PID 作为该 Consumer 的实例名( instanceName);
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//创建 MQClientInstance 对象,需要判断是否已经创建
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
//设置参数
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
//初始化 PullAPIWrapper 对象
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
//注册 FilterMessageHook 列表,用于消息的过滤;
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//若在应用层通过 DefaultMQPushConsumer.setOffsetStore
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
//若消息模式是广播( BROADCASTING),则初始化LocalFileOffsetStore 对象并赋值给
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
//若消息模式是集群( CLUSTERING),则初始化 RemoteBrokerOffsetStore 对象并赋值给DefaultMQPushConsumerImpl.offsetStore 变量
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
}
//将本地的 offsets.json 文件内容加载到 LocalFileOffsetStore.offsetTable 变量中
this.offsetStore.load();
//若是顺序消费
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
/*
ConsumeMessageConcurrentlyService 类的 start 方法没有实现任何业务逻辑;
ConsumeMessageOrderlyService 的 start 方法的处理逻辑是当消息模式为
集群模式时,设置定时任务每隔 20 秒调用一次ConsumeMessageOrderlyService.lockMQPeriodically()方法,
*/
this.consumeMessageService.start();
//将 DefaultMQPushConsumerImpl 对象在 MQClientInstance 中注册
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
//说明在一个客户端的一个进程下面启动多个Consumer 时 consumerGroup 名字不能一样,否则无法启动;
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//启动 MQClientInstance 对象
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
//设置 DefaultMQPushConsumerImpl 的 ServiceState 为 RUNNING;
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//更新 topic 的配置信息;
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
/*
第一,向所有在 MQClientInstance.brokerAddrTable 列表中
的 Broker 发送心跳消息;第二,向 Filter 过滤服务器发送
REGISTER_MESSAGE_FILTER_CLASS 请求码,更新过滤服务器中的 Filterclass 文件;
*/
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}


(二)RebalanceImpl的解读

该类是负载均衡的相关类,接下来就解读它的几个重要的方法

1,

private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
//广播模式
case BROADCASTING: {
//获取MessageQueue 集合;
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
//为true说明RebalanceImpl.processQueueTable 列表有变化
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", //
consumerGroup, //
topic, //
mqSet, //
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
//集群模式
case CLUSTERING: {
//获取MessageQueue 集合;
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//获取以该 consumerGroup 为名的 ClientId 集合
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//进行排序
Collections.sort(mqAll);
Collections.sort(cidAll);

AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
//为当前Consumer分配消费的MessageQueue
allocateResult = strategy.allocate(//
this.consumerGroup, //
this.mQClientFactory.getClientId(), //
mqAll, //
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}

Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}

boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}


2,

rivate boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
boolean changed = false;

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
//参数topic等于MessageQueue的topic值
if (mq.getTopic().equals(topic)) {
//不在参数的集合中
if (!mqSet.contains(mq)) {
//对应的 ProcessQueue 对象的 dropped 变量设置为true
pq.setDropped(true);
//如果删除消费进度成功
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
}
//如果距离上次拉取时间已经超时
else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
//消费类型为被动消费:push模式
case CONSUME_PASSIVELY:
//将该记录对应的 ProcessQueue 对象的dropped 变量置为 true
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
//遍历集合
for (MessageQueue mq : mqSet) {
//如果不在map中
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}

this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
//获取该MessageQueue对象的下一个消费偏移值
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
//加入map中国
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
//构造PullRequest后,加入pullRequestList中
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
/*
目的:将 PullRequest 对象放入 PullMessageService
服务线程的 pullRequestQueue 队列中
*/
this.dispatchPullRequest(pullRequestList);

return changed;
}


(三)Push模式下的消息消费

(1)拉取消息(pullMessage)

public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
//检查ProcessQueue 对象的 dropped 是否为 true
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
//更新时间戳
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
//检查该 Consumer 是否运行
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
//3 秒之后再次将该 PullRequest 对象放入 PullMessageService.pullRequestQueue 队列中
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}

if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
//获取msgCount
long size = processQueue.getMsgCount().get();
//如果大于消费端的流控阈值
if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
//50ms后重新该PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((flowControlTimes1++ % 1000) == 0) {
log.warn(
"the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
}
return;
}
//如果不是顺序消费
if (!this.consumeOrderly) {
//ProcessQueue 对象的 msgTreeMap的第一个 key 值与最后一个 key 值之间的差额大于阀值
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
//50ms后重新该PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((flowControlTimes2++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, flowControlTimes2);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
//获取偏移量
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
//更新状态
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
//省略匿名内部类PullCallback
//接下来更新订阅关系
}


(四)顺序消费的逻辑

顺序消费存在三把锁:1) ConsumeMessageOrderlyService 类中的定时任务会每隔20秒钟执行一次lockMQPeriodically方法,获取该Consumer客户端在Broker端锁住的MessageQueue集合(分布式锁),并通过该集合获取对应的ProcessQueue(消息处理队列)加上本地锁和加锁时间,目的是为了消费时在本地检查消费队列是否被锁住。2)在进行消息队列的消费过程中,对MessageQueue对象进行本地同步锁,保证同一个时间只允许一个线程消息一个ConsumeQueue队列。3) 在回调业务层定义ConsumeMessageOrderlyService.messageListener:MessageListenerOrderly

类的 consumeMessage 方法之前获取 ProcessQueue.lockConsume:ReentrantLock变量的锁即消费处理队列的锁, 该锁的粒度比消息队列的同步锁粒度更小。

主要看下ConsumeMessageOrderlyService内部类ConsumeRequest.run方法

@Override
public void run() {
//检查 ProcessQueue.dropped 是否为 true,若是则直接返回;
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
//对 MessageQueue 对象加锁,保证同一时间只允许一个线程使用该MessageQueue 对象
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
//若消息模式是广播或者对应的 ProcessQueue.locked 等于 true 且锁的时间未过期
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
//若消息模式是集群并且 ProcessQueue.locked 不等于 true( 未锁住)或者锁住了但是锁已经超时
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}

long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
//获取一次批量消费的消息个数 batchSize
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//takeMessages从ProcessQueue.msgTreeMap变量中获取 batchSize个数的 List<MessageExt>列表;并且从 msgTreeMap 中删除,存入临时变量 msgTreeMapTemp 中
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

ConsumeOrderlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//防止在消费的过程中,被其他线程将此消费队列解锁了,从而引起并发消费的问题
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//保证同一个 ProcessQueue 在同一时间只能有一个线程调用 consumeMessage 方法
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
RemotingHelper.exceptionSimpleDesc(e), //
ConsumeMessageOrderlyService.this.consumerGroup, //
msgs, //
messageQueue);
hasException = true;
} finally {
//释放锁
this.processQueue.getLockConsume().unlock();
}

if (null == status //
|| ConsumeOrderlyStatus.ROLLBACK == status//
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
ConsumeMessageOrderlyService.this.consumerGroup, //
msgs, //
messageQueue);
}
//对返回的state进行判断
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}

if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//处理回调方法 consumeMessage 的执行结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}

ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}


(五)并发消费的逻辑

主要涉及ConsumeMessageConcurrentlyService内部类ConsumeRequest.run方法。接下来看些些基本逻辑:

1,判断 proccessQueue 是否被 droped 的, 废弃直接返回,不在消费消息

2.构建并行消费上下文

3,给消息设置消费失败时候的 retry topic,当消息发送失败的时候发送到 topic为%RETRY%groupname 的队列中

4,调 MessageListenerConcurrently 监听器的 consumeMessage 方法消费消息,返回消费结果

5,如果 ProcessQueue 的 droped 为 true,不处理结果,不更新 offset, 但其实这里消费端是消费了消息的,这种情况感觉有被重复消费的风险

6,处理消费结果

6.1)消费成功, 对于批次消费消息,返回消费成功并不代表所有消息都消费成功,

6.2)但是消费消息的时候一旦遇到消费消息失败直接放回,根据 ackIndex 来标记

6.3)成功消费到哪里了

6.4)消费失败, ackIndex 设置为-1

6.5)广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集群性能会有较大影响,失败重试功能交由应用处理。集群模式, 将消费失败的消息一条条的发送到 broker 的重试队列中去,如果此时还有发送到重试队列发送失败的消息,那就在 cosumer 的本地线程定时 5秒钟以后重试重新消费消息, 在走一次上面的消费流程。

7.删除正在消费的队列 processQueue 中本次消费的消息,放回消费进度

8.更新消费进度, 这里的更新只是一个内存 offsetTable 的更新,后面有定时任务定时更新到 broker 上去

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rocketmq 源码