您的位置:首页 > 运维架构 > Apache

RocketMQ源码:Producer启动分析

2018-09-17 02:58 1246 查看
本文主要分析RocketMQ中Producer的启动过程。

RocketMQ的版本为:4.2.0 release。

一.时序图

根据源码,把Producer启动过程的时序图画了一遍:



二.源码分析

1 start() :DefaultMQProducer启动。

DefaultMQProducer主要功能都是在DefaultMQProducerImpl中实现的。类似的,DefaultMQPushConsumer的大部分功能也在DefaultMQPushConsumerImpl中实现:

//DefaultMQProducer#start
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}


1.1 checkConfig:检查producerGroup是否合法。

// DefaultMQProducerImpl#checkConfig
private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
if (null == this.defaultMQProducer.getProducerGroup()) {
throw new MQClientException("producerGroup is null", null);
}
if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {// 不能等于"DEFAULT_PRODUCER"
throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
null);
}
}
// Validators#checkGroup
public static void checkGroup(String group) throws MQClientException {
if (UtilAll.isBlank(group)) {// 不能为空
throw new MQClientException("the specified group is blank", null);
}
if (!regularExpressionMatcher(group, PATTERN)) {
throw new MQClientException(String.format(
"the specified group[%s] contains illegal characters, allowing only %s", group,
VALID_PATTERN_STR), null);
}
if (group.length() > CHARACTER_MAX_LENGTH) {// 长度不能大于255
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
}


1.2 getAndCreateMQClientInstance:获取MQClientInstance。

//MQClientManager#getAndCreateMQClientInstance
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();// 构建该Producer的ClientID,等于IP地址@instanceName
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {// 如果当前客户端不在mq客户端实例集合中,则创建一个实例并加入
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {// 说明一个IP客户端下面的应用,只有在启动多个进程的情况下才会创建多个MQClientInstance对象
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}


1.3 registerProducer:注册Producer。

// MQClientInstance#registerProducer
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);// 如果没有添加过,就往producerTable中加入当前的Producer
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}


1.4 MQClientInstance#start 启动mQClientFactory。

// DefaultMQProducerImpl#start(true)
if (startFactory) {
mQClientFactory.start();
}
// MQClientInstance#start
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();// 获取nameService地址
}
// Start request-response channel
this.mQClientAPIImpl.start();// 对象负责底层消息通信,获取nameService地址
// Start various schedule tasks
this.startScheduledTask();// 启动各种定时任务
// Start pull service
this.pullMessageService.start();// 启动拉取消息服务
// Start rebalance service
this.rebalanceService.start();// 启动消费端负载均衡服务
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);// 再次调用DefaultMQProducerImpl.start(),注意传参为false。此时ServiceState还是 START_FAILED 只调用了一次心跳服务 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;// 改变serviceState状态为 RUNNING 启动状态
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}


1.4.1 MQClientAPIImpl#start 负责底层消息通信,启动客户端对象。

// MQClientAPIImpl#start
public void start() {
this.remotingClient.start();// RemotingClient是RocketMQ封装了Netty网络通信的客户端
}


1.4.2 MQClientInstance#startScheduledTask 启动各种定时任务。

// MQClientInstance#startScheduledTask
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();// 更新NameServer地址
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();// 从nameService更新Topic路由信息
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();// 清理挂掉的broker
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();// 向broker发送心跳信息
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();// 持久化consumerOffset,保存消费者的Offset
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();// 调整消费线程池
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}


1.5 MQClientInstance#sendHeartbeatToAllBrokerWithLock 向所有的Broker发送心跳信息。

// MQClientInstance#sendHeartbeatToAllBrokerWithLock
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();// 向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息
this.uploadFilterClassSource();// 向Filter过滤服务器发送REGISTER_MESSAGE_FILTER_CLASS请求码,更新过滤服务器中的Filterclass文件
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}


上面就是RocketMQ中Producer的启动过程,上面分析了主要的几处地方,如果想了解启动过程中的详细代码,可以从Github上面clone代码到本地,试着调试和分析。附上地址:4.2.0 release
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Apache RocketMQ