您的位置:首页 > 其它

RocketMQ 源码分析(一)

2016-12-22 23:11 405 查看
producer启动流程

参考斩秋的博客

http://blog.csdn.net/quhongwei_zhanqiu/article/details/39141415

进行调试

DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
producer.start();


调试跟进代码,进入了DefaultMQProducerImpl的start方法

public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;

//校验ground Name
this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

//创建一个MQ工厂类
this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,
rpcHook);

//producerTable注册生产方的用户组和Producer
boolean registerOK =
mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}

this.topicPublishInfoTable
.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
//这个就是对应的一些Service的start
mQClientFactory.start();(1)
}

log.info("the producer [{}] start OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); (2)
}


里面最重要的二个方法
mQClientFactory.start();(1)
public void start() throws MQClientException {
PackageConflictDetect.detectFastjson();

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
}
//Start request-response channel
this.mQClientAPIImpl.start();
//Start various schedule tasks
//定时获取nameserver信息
this.startScheduledTask();
//Start pull service
this.pullMessageService.start();
//Start rebalance service
//复杂均衡启动
this.rebalanceService.start();
//Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId()
+ "] has been created before, and failed.", null);
default:
break;
}
}
}


第二处方法

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

进入到MQClientInstance的sendHeartbeatToAllBrokerWithLock,方法名顾名思义就是Producer定时发送生产端的信息给broker。mQClientFactory在之前绑定了消费者的groupName和Procuder信息

public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();(2)
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}


(2)处的代码应该是过滤消费端的Message,有点不懂为什么在Producer启动的时候过滤,

private void uploadFilterClassSource() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> next = it.next();
MQConsumerInner consumer = next.getValue();
if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
Set<SubscriptionData> subscriptions = consumer.subscriptions();
for (SubscriptionData sub : subscriptions) {
if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
final String consumerGroup = consumer.groupName();
final String className = sub.getSubString();
final String topic = sub.getTopic();
final String filterClassSource = sub.getFilterClassSource();
try {
this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic,
filterClassSource);
} catch (Exception e) {
log.error("uploadFilterClassToAllFilterServer Exception", e);
}
}
}
}
}
}


后来运行了一下RocketMQ的高级过滤demo,就是消费端使用过滤类来过滤消息的时候,debug模式的时候会走这段代码,看RocketMQ的用户手册的原理图,





当消费端启动的时候会将过滤类上传到所有的过滤服务上面。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: