RocketMQ源码:Producer启动分析
2018-09-17 02:58
1246 查看
本文主要分析RocketMQ中Producer的启动过程。
RocketMQ的版本为:4.2.0 release。
上面就是RocketMQ中Producer的启动过程,上面分析了主要的几处地方,如果想了解启动过程中的详细代码,可以从Github上面clone代码到本地,试着调试和分析。附上地址:4.2.0 release。
RocketMQ的版本为:4.2.0 release。
一.时序图
根据源码,把Producer启动过程的时序图画了一遍:二.源码分析
1 start() :DefaultMQProducer启动。
DefaultMQProducer主要功能都是在DefaultMQProducerImpl中实现的。类似的,DefaultMQPushConsumer的大部分功能也在DefaultMQPushConsumerImpl中实现://DefaultMQProducer#start public void start() throws MQClientException { this.defaultMQProducerImpl.start(); }
1.1 checkConfig:检查producerGroup是否合法。
// DefaultMQProducerImpl#checkConfig private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); if (null == this.defaultMQProducer.getProducerGroup()) { throw new MQClientException("producerGroup is null", null); } if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {// 不能等于"DEFAULT_PRODUCER" throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.", null); } } // Validators#checkGroup public static void checkGroup(String group) throws MQClientException { if (UtilAll.isBlank(group)) {// 不能为空 throw new MQClientException("the specified group is blank", null); } if (!regularExpressionMatcher(group, PATTERN)) { throw new MQClientException(String.format( "the specified group[%s] contains illegal characters, allowing only %s", group, VALID_PATTERN_STR), null); } if (group.length() > CHARACTER_MAX_LENGTH) {// 长度不能大于255 throw new MQClientException("the specified group is longer than group max length 255.", null); } }
1.2 getAndCreateMQClientInstance:获取MQClientInstance。
//MQClientManager#getAndCreateMQClientInstance public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId();// 构建该Producer的ClientID,等于IP地址@instanceName MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) {// 如果当前客户端不在mq客户端实例集合中,则创建一个实例并加入 instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) {// 说明一个IP客户端下面的应用,只有在启动多个进程的情况下才会创建多个MQClientInstance对象 instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
1.3 registerProducer:注册Producer。
// MQClientInstance#registerProducer public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; } MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);// 如果没有添加过,就往producerTable中加入当前的Producer if (prev != null) { log.warn("the producer group[{}] exist already.", group); return false; } return true; }
1.4 MQClientInstance#start 启动mQClientFactory。
// DefaultMQProducerImpl#start(true) if (startFactory) { mQClientFactory.start(); } // MQClientInstance#start public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr();// 获取nameService地址 } // Start request-response channel this.mQClientAPIImpl.start();// 对象负责底层消息通信,获取nameService地址 // Start various schedule tasks this.startScheduledTask();// 启动各种定时任务 // Start pull service this.pullMessageService.start();// 启动拉取消息服务 // Start rebalance service this.rebalanceService.start();// 启动消费端负载均衡服务 // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false);// 再次调用DefaultMQProducerImpl.start(),注意传参为false。此时ServiceState还是 START_FAILED 只调用了一次心跳服务 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock() log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING;// 改变serviceState状态为 RUNNING 启动状态 break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
1.4.1 MQClientAPIImpl#start 负责底层消息通信,启动客户端对象。
// MQClientAPIImpl#start public void start() { this.remotingClient.start();// RemotingClient是RocketMQ封装了Netty网络通信的客户端 }
1.4.2 MQClientInstance#startScheduledTask 启动各种定时任务。
// MQClientInstance#startScheduledTask private void startScheduledTask() { if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();// 更新NameServer地址 } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer();// 从nameService更新Topic路由信息 } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker();// 清理挂掉的broker MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();// 向broker发送心跳信息 } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset();// 持久化consumerOffset,保存消费者的Offset } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.adjustThreadPool();// 调整消费线程池 } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); }
1.5 MQClientInstance#sendHeartbeatToAllBrokerWithLock 向所有的Broker发送心跳信息。
// MQClientInstance#sendHeartbeatToAllBrokerWithLock public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker();// 向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息 this.uploadFilterClassSource();// 向Filter过滤服务器发送REGISTER_MESSAGE_FILTER_CLASS请求码,更新过滤服务器中的Filterclass文件 } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed."); } }
上面就是RocketMQ中Producer的启动过程,上面分析了主要的几处地方,如果想了解启动过程中的详细代码,可以从Github上面clone代码到本地,试着调试和分析。附上地址:4.2.0 release。
相关文章推荐
- 查看RocketMQ的broker启动部分源码分析总结
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId
- RocketMQ源码:有序消息分析
- 源码分析RocketMQ之消费队列、Index索引文件存储结构与存储机制-上篇
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- 分布式消息队列RocketMQ源码分析之4 -- Consumer负载均衡与Kafka的Consumer负载均衡之不同点
- RocketMQ源码分析之Broker概述与同步消息发送原理与高可用设计及思考
- Rocket MQ consumer 源码分析(绝对干货)
- 源码分析RocketMQ之消息消费重试机制
- 源码分析RocketMQ之消息ACK机制(消费进度)
- RocketMQ原理解析-producer 1.启动流程
- 分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制
- RocketMQ源码分析----Broker处理发送请求
- 源码分析RocketMQ文件清除机制
- RocketMQ runbroker.sh 分析JVM启动参数
- 源码分析RocketMQ之CommitLog消息存储机制
- rocketmq 4.0 Broker启动流程分析
- RocketMQ源码解析-Consumer启动(2)
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId