您的位置:首页 > 其它

RocketMQ源码解析-Consumer启动(1)

2017-10-16 23:37 751 查看
DefaultMQPullConsumer继承了ClientConfig类,作为主动拉获取消息的消费者实现接口的管理与相关属性的配置(与PushConsumer对应)。相比生产者,消费者配置的属性要复杂得多。由于在RocketMQ中,生产者消费者共用一个客户端实现类MQClient,所以在前文中没有解析的方法是属于服务于消费者的。首先以主动拉取方式获得消息的消费者PullConsumer为例子。

在DefaultMQPullConsumer的构造方法中,实现的跟生产者一样简单,只是简单的配置了传入了的ConsumerGroup用以确认该消费者究竟是处于哪个消费者组名,然后调用DefaultMQPullConsumerImpl的构造方法。而DefaultMQPullConsumerImpl的构造方法也是非常简单,只是配置了调用他的消费者配置类,以及传进来的rpcHook。

既然DefaultMQPullConsumer在整个过程中充当着配置者的角色,那么显然可以直接在这里配置消费者相关的订阅的topic,维护着一个set用来存放订阅的topic。

private Set<String> registerTopics = new HashSet<String>();


 

整个消费者的启动由调用DefaultMQPullConsumer的start()方法开始。而在其中的start()也只是简单的调用了DefaultMQPullConsumerImpl的start(),其他全无任何额外操作,那么就可以直接从DefaultMQPullConsumerImpl的start()方法开始看起。

public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

this.copySubscription();

if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}

this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer,
this.rpcHook);

this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer
.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

this.pullAPIWrapper = new PullAPIWrapper(//
mQClientFactory,//
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

if (this.defaultMQPullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
} else {
switch (this.defaultMQPullConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore =
new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore =
new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPullConsumer.getConsumerGroup());
break;
default:
break;
}
}

this.offsetStore.load();

boolean registerOK =
mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;

throw new MQClientException("The consumer group["
+ this.defaultMQPullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}

mQClientFactory.start();
4000

log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}

一开始DefaultMQPullConsumerImpl的状态量就是CREAT_JUST,所以进入下面的开始阶段。

在checkConfig()方法里先是简单的对消费者的ConsumerGroup进行检查,防止为空或者非法,接下来也会防止与默认的消费者集群名冲突。接下来将要配置消费者的通知方式(MessageModel),在MQ的消费者当中,通知方式分为BROADCASTING(广播模式),以及CLUSTERINT(集群模式),默认为集群模式配置在DefaultMQPullConsumer,接下来先以集群模式为例子接下去消费者的启动。

 

消费者的消息队列分配策略

然后将会对消息队列分配策略进行检查。在DefaultMQPullConsumer中实现有默认的消息队列分配策略(Average平均分配策略),消息队列分配策略都实现了AllocateMessageQueneStrategy接口实现了相应的allocate()方法。以默认的平均分配方式为例子来看他的allocat()方法。

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}

List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", //
consumerGroup, //
currentCID,//
cidAll);
return result;
}

int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

在平均分配消息队列的方式下,传入的参数有当前消费者id,当前所有消费者数组,以及所有消息队列数组。在平均分配的策略下,将会根据消费者数组的大小以及消息队列数组的大小,以及该生产者在消费者数组的位置确定具体的生产者获取该消费者数组的哪一部分消费者队列。

而RocketMQ给出的分配策略还有循环平均分配,按照配置分配,按照机房分配等分配方式,这里采用默认的平均分配。

在检查完上述的消费者集群名,消息通知方式,消息队列分配方式之后,checkConfig()方法结束。

 

接下里将会调用copySubscription()方法,将DefaultMQPullConsumer里配置的注册topic复制过来并抽象成消费者具体能够接受的形式。

private void copySubscription() throws MQClientException {
try {
Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
if (registerTopics != null) {
for (final String topic : registerTopics) {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

在这里会根据每一个注册在DefaultMQPusllConsumer里的topic,创建SubscriptionData来完成topic数据的转变。

public final static String SUB_ALL = "*";
private boolean classFilterMode = false;
private String topic;
private String subString;
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis();

从SubscriptionData的数据结构可以看出每一个topic对应一个相应的SubscriptionData,里面存储着相应的tag之类的数据。在创建完毕之后,DefaultMQPullConsumerImpl里面的rebalanceImpl将会把topic和SubscriptionData作为键值对存放在里面。

rebalanceImpl的作用将会在具体发挥作用的时候解释。

到这里,开始阶段的配置检查与准备告一段落,接下里将会开启消费者客户端。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: