您的位置:首页 > 编程语言

48. 源代码解读-RocketMQ-client接收消息流程

2018-01-01 08:52 495 查看

1. 简介

消费消息可以分成pull和push方式,push消息使用比较简单,因为RocketMQ已经帮助我们封装了大部分流程,我们只要重写回调函数即可。

下面我们就以push消费方式为例,分析下这部分源代码流程。

2. 消费者启动流程图



3.消费者类图



4. 消费者源代码流程

4.1 消费客户端启动

根据官方(https://github.com/apache/rocketmq)提供的例子,Consumer.java里面使用DefaultMQPushConsumer启动消息消费者,如下

//初始化DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//设置命名服务,参考namesrv的启动
consumer.setNamesrvAddr("localhost:9876");
//设置消费起始位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅消费的主题和过滤符
consumer.subscribe("TopicTest", "*");
//设置消息回调函数
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();

4.2 消息者启动

我们接着看consumer.start()方法

@Override
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}

DefaultMQPushConsumerImpl.java

public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
...

this.checkConfig();//检查参数

...

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

...

this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

...
this.offsetStore.load();

...

this.consumeMessageService.start();

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
...

mQClientFactory.start();

this.serviceState = ServiceState.RUNNING;
...
}

...
}

在初始化一堆参数之后,然后调用mQClientFactory.start();

private MQClientInstance mQClientFactory;

其实这个命名有点奇怪啊(阿里程序员手抖了?),为什么MQClientInstance类型的变量名称叫mQClientFactory ...

那继续看MQClientInstance的start

4.3 MQClientInstance

public void start() throws MQClientException {

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
...
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
...
}
}
}

各行代码的作用就像源代码里面的注释一样,重点看下pullMessageService.start和rebalanceService.start
pullMessageService.start作用是不断从一个阻塞队列里面获取pullRequest请求,然后去RocketMQ broker里面获取消息。
如果没有pullRequest的话,那么它将阻塞。
那么,pullRequest请求是怎么放进去的呢?这个就要看rebalanceService了。

4.4 pullMessageService.start

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

@Override
public void run() {
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
..
}
}
}

顺便说一句,pullMessageService和rebalanceService都是继承自ServiceThread

public class PullMessageService extends ServiceThread {}

ServiceThread简单封装了线程的启动,调用start方法,就会调用它的run方法。

public ServiceThread() {
this.thread = new Thread(this, this.getServiceName()); //把当前对象作为runnable传入线程构造函数
}

public void start() {
this.thread.start();
}

这样启动线程就要方便一点,看起来舒服一点。

嗯,继续分析之前的分析。

从pullMessageService的run方法可以看出它是从阻塞队列pullRequestQueue里面获取pullRequest,如果没有那么将阻塞。(如果不清楚java阻塞的使用,清百度)

执行完一次pullReqeust之后,再继续下一次获取阻塞队列,因为它是个while循环。

所以,我们需要分析下pullRequest放进队列的流程,也就是rebalanceService.

4.5 rebalanceService

public class RebalanceService extends ServiceThread {
@Override
public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
}

MQClientInstance.java

public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}

DefaultMQPushConsumerImpl.java

@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}

RebalanceImpl.java

public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}

this.truncateMessageQueueNotMyTopic();
}

private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
....
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);

Collections.sort(mqAll);
Collections.sort(cidAll);

AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}

boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {

this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}

一路跟下来,来到了RebalanceImpl.java的rebalanceByTopic方法,这个方法里面有两个case(Broadcasting和Clustering)也就是消息消费的两个模式,广播和集群消息。
广播的话,所有的监听者都会收到消息,集群的话,只有一个消费者可以收到,我们以集群消息为例。
先大概解释下在rebalanceByTopic里面要做什么。

从namesrv获取broker里面这个topic的消费者数量

从namesrv获取broker这个topic的消息队列数量

根据前两部获取的数据进行负载均衡计算,计算出当前消费者客户端分配到的消息队列。

按照分配到的消息队列,去broker请求这个消息队列里面的消息。

上面代码厘米mqset就是这个topic的消费队列,一般是4个,但是这个值是可以修改的,存储的位置在~/store/config/topics.json里面,比如:

"TopicTest":{
"order":false,
"perm":6,
"readQueueNums":4,
"topicFilterType":"SINGLE_TAG",
"topicName":"TopicTest",
"topicSysFlag":0,
"writeQueueNums":4
}

可以修改readQueueNums和writeQueueNums为其他值

try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}

这段代码就是客户端根据获取到的这个topic消费者数量和消息队列数量,使用负载均衡策略计算出当前客户端能够使用的消息队列。
负载均衡策略代码在这个位置。



那我们继续4.4 pullMessageService.start分析,因为rebalanceService已经把pullRequest放到了阻塞队列。

4.6 PullMessageService.run

@Override
public void run() {
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {

}
}
}

private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {

}
}

调用到DefaultMQPushConsumerImpl.pullMessage(pullRequest)这个方法里面。

4.6.1

public void pullMessage(final PullRequest pullRequest) {
...

final long beginTimestamp = System.currentTimeMillis();

PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
System.out.printf("pullcallback onsuccess: " + pullResult + " %n");
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispathToConsume);
}
break;
}
}
}

@Override
public void onException(Throwable e) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};

try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}

上面这段代码主要就是设置消息获取后的回调函数PullCallback pullCallback,然后调用pullAPIWrapper.pullKernelImpl去Broker里面获取消息。

获取成功后,就会回调pullCallback的onSuccess方法的FOUND case分支。

在pullCallback的onSucess方法的FOUND case分支,会根据回调是同步还是异步,分为两种情况,如下:



同步消息和异步消息区别的源代码实现以后再讲。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rocketmq