RocketMQ 源码分析(一)
2016-12-22 23:11
405 查看
producer启动流程
参考斩秋的博客
http://blog.csdn.net/quhongwei_zhanqiu/article/details/39141415
进行调试
调试跟进代码,进入了DefaultMQProducerImpl的start方法
第二处方法
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
进入到MQClientInstance的sendHeartbeatToAllBrokerWithLock,方法名顾名思义就是Producer定时发送生产端的信息给broker。mQClientFactory在之前绑定了消费者的groupName和Procuder信息
(2)处的代码应该是过滤消费端的Message,有点不懂为什么在Producer启动的时候过滤,
后来运行了一下RocketMQ的高级过滤demo,就是消费端使用过滤类来过滤消息的时候,debug模式的时候会走这段代码,看RocketMQ的用户手册的原理图,
当消费端启动的时候会将过滤类上传到所有的过滤服务上面。
参考斩秋的博客
http://blog.csdn.net/quhongwei_zhanqiu/article/details/39141415
进行调试
DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer"); producer.start();
调试跟进代码,进入了DefaultMQProducerImpl的start方法
public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; //校验ground Name this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } //创建一个MQ工厂类 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); //producerTable注册生产方的用户组和Producer boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable .put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { //这个就是对应的一些Service的start mQClientFactory.start();(1) } log.info("the producer [{}] start OK", this.defaultMQProducer.getProducerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, "// + this.serviceState// + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); (2) }
里面最重要的二个方法 mQClientFactory.start();(1) public void start() throws MQClientException { PackageConflictDetect.detectFastjson(); synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; //If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr()); } //Start request-response channel this.mQClientAPIImpl.start(); //Start various schedule tasks //定时获取nameserver信息 this.startScheduledTask(); //Start pull service this.pullMessageService.start(); //Start rebalance service //复杂均衡启动 this.rebalanceService.start(); //Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
第二处方法
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
进入到MQClientInstance的sendHeartbeatToAllBrokerWithLock,方法名顾名思义就是Producer定时发送生产端的信息给broker。mQClientFactory在之前绑定了消费者的groupName和Procuder信息
public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker(); this.uploadFilterClassSource();(2) } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed."); } }
(2)处的代码应该是过滤消费端的Message,有点不懂为什么在Producer启动的时候过滤,
private void uploadFilterClassSource() { Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> next = it.next(); MQConsumerInner consumer = next.getValue(); if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) { Set<SubscriptionData> subscriptions = consumer.subscriptions(); for (SubscriptionData sub : subscriptions) { if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) { final String consumerGroup = consumer.groupName(); final String className = sub.getSubString(); final String topic = sub.getTopic(); final String filterClassSource = sub.getFilterClassSource(); try { this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource); } catch (Exception e) { log.error("uploadFilterClassToAllFilterServer Exception", e); } } } } } }
后来运行了一下RocketMQ的高级过滤demo,就是消费端使用过滤类来过滤消息的时候,debug模式的时候会走这段代码,看RocketMQ的用户手册的原理图,
当消费端启动的时候会将过滤类上传到所有的过滤服务上面。
相关文章推荐
- RocketMQ源码分析—CommitLog类源码分析
- RocketMQ源码分析(二)Producer端发送数据
- 源码分析RocketMQ消息消费机制----消费端消息负载均衡机制与重新分布
- RocketMQ源码分析之Message消费与拉取(下Consume的拉取消费)
- 源码分析RocketMQ消息过滤机制下篇-FilterServer、ClassFilter模式详解
- RocketMQ 源码分析
- RocketMQ源码分析之Message存储
- 源码分析RocketMQ顺序消息消费实现原理
- RocketMQ 源码分析 —— 高可用
- RocketMQ源码分析之顺序消费
- Rocketmq之namesrv启动流程源码详解分析
- rocketmq-remoting 源码分析NettyRemotingServer
- 阿里消息队列中间件 RocketMQ 源码分析 —— Message 拉取与消费(上)
- RocketMQ源码分析----消费消息
- 源码分析RocketMQ消息消费机制----消费者拉取消息机制
- RocketMQ client客户端模块源码分析一(生产者)
- RocketMQ源码分析----消息存储
- RocketMQ源码分析----刷盘的实现
- 源码分析RocketMQ消息过滤机制上篇-----消息消费服务端过滤与TAG模式过滤实现
- 源码分析RocketMQ消息PULL-长轮询模式