rocketMq-Producer原理源码分析
producer的源码结构如下:
我们通常使用mq发送消息,实例化producer的方式就是:
DefaultMQProducer producer = new DefaultMQProducer("producerGroupName"); producer.setNamesrvAddr(namesrv); producer.start(); producer.send(msg);[p] 所以就从DefaultMQProducer 开始说起吧:一、DefaultMQProducer继承ClientConfig类同时实现了MQProducer接口,同时包含一个重要的属性
public class DefaultMQProducer extends ClientConfig implements MQProducer { protected final transient DefaultMQProducerImpl defaultMQProducerImpl;DefaultMQProducer作为rocketmq生产者的默认实现,其实它并没有做任何实现,其内部引用一个DefaultMQProducerImpl实例进行具体消息发送。它有一些基础配置,比如多长时间内消息发送多少次还是没成功则放弃(默认为4秒内发送3次,每次发消息默认超时间为3秒)
- 重要字段
1 String producerGroup 生产者的组名。
一个jvm内,具有相同producerGroup名字的生产者实例只有一个。
2 retryAnotherBrokerWhenNotStoreOK
消息没有存储成功是否发送到另外一个broker.
3 sendMsgTimeout
发送消息超时时间,默认为3秒 - 重要方法
send(Message msg)
发送消息,调用DefaultMQProducerImpl .send()发送
amesrvAddr //namesrv地址列表,多个地址列表用分号隔开 clientIP //本机ip instanceName //客户端实例名称,客户端创建的多个producer,consumer实际上共用的一个内部实例(包含网络数据和线程资源) clientCallbackExecutorThreads //通信层异步回调线程数 pollNameServerInterval //轮询namesrv时间间隔 默认30秒 heartbeatBrokerInterval //向broker发送心跳时间间隔 默认30秒 persistConsumerOffsetInterval //持久化consumer消费进度的时间间隔包括我们的producer.setNamesrvAddr(namesrv);其实就是调用了ClientConfig.setNamesrvAddr(namesrv); 当然,里面还包括了一些不常用,但也很重要的方法,比如[/p]
ClientConfig.resetClientConfig(final ClientConfig cc) //自定义一个客户端配置 ClientConfig.cloneClientConfig() //克隆一个已有的客户端配置
至于MQProducer接口就不用说了,定义了一组接口,例如 start(); //启动 producersend (final Message msg, final SendCallback sendCallback, final long timeout) ; //发送消息 .........DefaultMQProducerImpl:非常重要的一个方法,start(),也就是producer的启动方法,该方法的大致流程如下:
public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: 1、 this.serviceState = ServiceState.START_FAILED; 2、 this.checkConfig(); 3、 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); 4、 } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); 5、 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); 6、 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); 7、 if (startFactory) { mQClientFactory.start(); 8、 } this.serviceState = ServiceState.RUNNING; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 9、
1、检查DefaultMQProducerImpl.ServiceState的状态(初始化状态为ServiceState.CREATE_JUST);只有状态为CREATE_JUST时才启动该Producer;其他状态均不执行启动过程;2、将DefaultMQProducerImpl.ServiceState置为start_failed,以免客户端同一个进程中重复启动;3、检查producerGroup是否合法:不能为空、不能有非法字符、长度不能大于255、不能等于"DEFAULT_PRODUCER";若不合法则直接向应用层抛出MQClientException异常;若producerGroup不等于"CLIENT_INNER_PRODUCER"则设置Producer的实例名(instanceName);调用java的ManagementFactory.getRuntimeMXBean()方法获取该进程的PID作为该Producer的实例名(instanceName);4、this.defaultMQProducer.changeInstanceNameToPID(); 若producerGroup不等于"CLIENT_INNER_PRODUCER"则设置Producer的实例名(instanceName);调用java的ManagementFactory.getRuntimeMXBean()方法获取该进程的PID作为该Producer的实例名(instanceName);
5、创建一个客户端实例
String clientId = clientConfig.buildMQClientId(); //构建该Producer的ClientID,等于IP地址@instanceName; instance = new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);[p]创建MQClientInstance对象。先检查单例对象MQClientManager的factoryTable:ConcurrentHashMap<String/* clientId */, MQClientInstance>变量中是否存在该ClientID的对象,若存在则直接返回该MQClientInstance对象,若不存在,则创建MQClientInstance对象,并以该ClientID为key值将新创建的MQClientInstance对象存入并返回,将返回的MQClientInstance对象赋值给DefaultMQProducerImpl.mQClientFactory变量;说明一个IP客户端下面的应用,只有在启动多个进程的情况下才会创建多个MQClientInstance对象;在初始化MQClientInstance对象的过程中,会做如下操作:
this.clientRemotingProcessor = new ClientRemotingProcessor(this); 5.1、 this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); 5.2、 if (this.clientConfig.getNamesrvAddr() != null) { 5.3、 this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr()); log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr()); } this.pullMessageService = new PullMessageService(this); 5.4、 this.rebalanceService = new RebalanceService(this); this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); 5.5、5.1)初始化ClientRemotingProcessor对象,处理接受的事件请求; 5.2)初始化MQClientAPIImpl对象,在初始化过程中,初始化了MQClientAPIImpl.remotingClient:NettyRemotingClient对象,将ClientRemotingProcessor对象作为事件处理器注册到NettyRemotingClient对象中,处理的事件号有:CHECK_TRANSACTION_STATE、NOTIFY_CONSUMER_IDS_CHANGED、RESET_CONSUMER_CLIENT_OFFSET、GET_CONSUMER_STATUS_FROM_CLIENT、GET_CONSUMER_RUNNING_INFO、CONSUME_MESSAGE_DIRECTLY。 5.3)若ClientConfig.namesrvAddr不为空,则拆分成数组存入MQClientAPIImpl.remotingClient变量中; 5.4)初始化PullMessageService、RebalanceService、ConsumerStatsManager服务线程;PullMessageService服务线程是供DefaultMQPushConsumer端使用的,RebalanceService服务线程是供Consumser端使用的; 5.5)初始化producerGroup等于"CLIENT_INNER_PRODUCER"的DefaultMQProducer对象; 6、将DefaultMQProducerImpl对象在MQClientInstance中注册,以producerGroup为key值、DefaultMQProducerImpl对象为values值存入MQClientInstance.producerTable:ConcurrentHashMap<String/* group */, MQProducerInner>变量中,若在该变量中已存在该producerGroup的记录则向应用层抛出MQClientException异常;说明在一个客户端的一个进程下面启动多个Producer时producerGroup名字不能一样,否则无法启动;7、以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中;(正常情况下使用命令在集群上建立topic变回保存在这里。)8、mQClientFactory.start();用MQClientInstance.start方法启动MQClientInstance对象;
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); 8.1、 } this.mQClientAPIImpl.start(); 8.2、 this.startScheduledTask(); 8.3、 this.serviceState = ServiceState.RUNNING 8.4、 this.pullMessageService.start(); 8.5、 this.rebalanceService.start(); 8.6、 this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }8.1)关于namesrvAddr的设置一般在两个地方,还有一个是在环境变量里设置.如果这两个地方都没有设置会走该方法.一个是在创建producer时候设置
producer.setNamesrvAddr(MyUtils.getNamesrvAddr())另一个是在环境变量里设置而
fetchNameServerAddr()是第三种调用http接口去寻址.需配置hosts信息,客户端默认每隔两分钟去访问一次这个http地址,并更新本地namesrvAddr地址. 8.2)客户端netty启动。 8.3)this.startScheduledTask();启动各种任务调度 8.3.1)从NameSrv遍历TopicRouteInfo(Topic的路由信息有brokerName,queueId组成),然后更新producer和consumer的topic信息 【30秒一次】
8.3.2)清理离线的broker 【30秒一次】 8.3.3)向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息 【30秒一次】 8.3.4)持久化consumer消费进度 【5秒一次】 8.3.5)启动线程池线程数调整线程。 【每分钟调整一次】 8.3.6)this.defaultMQProducer.getDefaultMQProducerImpl().start(false); 8.3.8)设置DefaultMQProducerImpl的ServiceState为RUNNING,使producer避免重复启动; 8.4)设置DefaultMQProducerImpl的ServiceState为RUNNING,使producer避免重复启动; 8.5)启动拉消息服务PullMessageService。 8.6)启动消费端负载均衡服务RebalanceService9、调用MQClientInstance.sendHeartbeatToAllBrokerWithLock()方法,向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息(consumer中有详细描述)。[/p]
此,简单总结一下producer的启动过程,只对状态为CREATE_JUST的DefaultMQProducerImpl执行启动过程,首先检查producerGroup合法性,构建producer的instanceName为进程号,构建producer的clientId为ip@instanceName, 创建MQClientInstance对象,期间所做的事为5.1-5.5,不再赘述。向MQClientInstance中注册DefaultMQProducerImpl,给topicPublishInfoTable添加一个初始值。用MQClientInstance.start方法启动MQClientInstance对象,期间所做的事为8.1-8.6,向Broker发送心跳消息。大概如下流程图:
producer启动以后自然就是向broker发送消息了,下面看看producer向broker发送消息的具体流程 rocketMq支持producer以三种方式发送消息到broker: 可靠同步发送:
SendResult sendResult = producer.send(msg);可靠异步发送:
producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { .......} @Override public void onException(Throwable e) {..........} });单向传输:
producer.sendOneway(msg);无论哪一种发送方式,只要我们没有明确指明超时时间,那就会使用默认的超时时间(30秒)private int sendMsgTimeout = 3000;下面我们以可靠同步发送为例:producer.send(msg);实际调用的是DefaultMQProducerImpl.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); 在该方法中会执行以下操作(省略了非关键步骤)
this.makeSureStateOK(); 1、 Validators.checkMessage(msg, this.defaultMQProducer); 2、 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); 3、 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; 4、 for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); 5、 if (mqSelected != null) { mq = mqSelected; sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); 6、 } }1、检查producer是否成功启动,this.serviceState = ServiceState.RUNNING代表producer已经成功启动,否则抛出异常throw new MQClientException("The producer service state not OK,.....")(我们见过这个异常) 2、参数检查,Message的长度大小,topic的非法字符,判空,长度等进行判断。 3、根据传入的topic从DefaultMQProducerImpl的ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable获取指定的TopicPublishInfo。 2.1)若没有获取到对应的TopicPublishInfo,就是我们实际情况中没有在集群上建立topic就往这个topic上发消息,那么此时不会报错,会建立该topic对应的TopicPublishInfo,然后返回,即topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());但是这会导致两个问题1、 http://www.mamicode.com/info-detail-327693.html 2、此时的topic是随机建立在一个broker上的,以后该producer的消息都会发送到这个broker上,就做不到负载均衡了。因此建议配置autoCreateTopicEnable=false,这样如果发送时的topic没有建立就会报错。 2.2)若有对应的TopicPublishInfo,则返回该topic对应的TopicPublishInfo。 4、根据发送方式设置失败重发次数,sync=3,async=1,sendOneWay=1。 5、选择一个发送队列。int pos = Math.abs(index++) % this.messageQueueList.size();每次获取queue都会通过sendWhichQueue加一来实现对所有queue的轮询,如果入参lastBrokerName不为空,代表上次选择的queue发送失败,这次选择应该避开同一个queue【范围是仅该条消息】。 6、进行真正的消息发送!
if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); 6.1、 brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); 6.2、 byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } int sysFlag = 0; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; } if (hasCheckForbiddenHook()) { 6.3、 CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); 6.4、 requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 6.5、 String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( 6.6、 brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); return sendResult; finally { msg.setBody(prevBody); } }6.1)String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); 获取broker地址,其中比较细节的是如果没获取到地址,会调用tryToFindTopicPublishInfo(mq.getTopic());从namesrv更新topic路由后再次获取。 6.2)此时获取的地址形如10.3.253.227:10911,rocketMq默认开启VipChannel所以发送的时候是发送到10.3.253.227:10909.新的addr是调用brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);得到的。这里能解释一个问题,当时测试的时候往pim的mq发消息,就是报错,跟10911相关的,默认开启VipChannel,所以发送到了10909上,所以需要手动加上一行代码producer.setVipChannelEnabled(false); 6.3)依次检查producer客户端是否设置了ForbiddenContext和SendMessageHook,如果有则执行对应方法(个人感觉这个类似于切面):this.executeCheckForbiddenHook(checkForbiddenContext); this.executeSendMessageHookBefore(context); 6.4)之后根据之前得到的一系列发送消息的配置,来构造发送给Broker的请求头数据。 6.5)如果请求头的topic是以%RETRY%开头的,就再给请求头额外配置重新消费次数和最大重新消费次数 String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); 6.6)
request.setBody(msg.getBody()); 6.6.1、 this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); 6.6.2、 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { 6.6.3 @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); 6.6.5 } }); this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); 6.6.4、 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { //netty相关 @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } });6.6.1)构造request的code和body(实际发送的消息内容)【Byte类型】 6.6.2)调用SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) 方法 6.6.3)进而调用RemotingClient.invokeSync(addr, request, timeoutMillis); 6.6.4)进而调用NettyRemotingClient.invokeSyncImpl(channel, request, timeoutMillis)给broker发送消息,实际是netty间的通信channel.writeAndFlush(request),并且在超时时间内等待返回结果 6.6.5)根据服务端的响应结果,调用processSendResponse(brokerName, msg, response);组装SendResult。 最后将SendResult层层返回,就回到了最初的样子 SendResult sendResult = producer.send(msg); 其中有一个问题还没理解,使用Netty传输POJO对象,重点在于对象的序列化。序列化的对象通过TCP进行网络传输,结合Netty提供的对象编解码器,可以做到远程传输对象。首先Java需要序列化的对象,需要实现java.io.Serializable接口。可以看到Message对象是实现该接口了的。但是发送消息时实际发送的对象是RemotingCommand,但是他并没有序列化。这是为什么呢?
最后简单总结一下producer发送消息的过程(以sync为例):检查producer状态,检查参数,获取topic的路由信息,选择发送队列,获取broker的地址,配置request请求头,给request添加code值和body的内容,使用netty将从NettyRemotingClient【netty服客户端】消息传输到broker(NettyRemotingServer【netty服务端】),等待响应结果,组装SendResult 并返回。大致如下流程:
阅读更多
- 源码分析RocketMQ顺序消息消费实现原理
- RocketMQ源码分析(二)Producer端发送数据
- spring源码分析之——spring aop原理
- HDFS2.X源码分析之:NameNode写文件原理
- Linux网络编程【四】:进程池、线程池原理及简单线程池源码分析
- Java多线程之Condition实现原理和源码分析(四)
- RocketMQ原理解析-producer 1.启动流程
- LIRe图像检索:CEDD算法原理与源码分析
- 【OpenCV】SIFT原理与源码分析
- Android源码解析之新进程中启动自定义服务过程(startService)的原理分析
- opencv中BackgroundSubtractorMOG2源码分析与原理讲解
- SIFT算法原理与OpenCV源码分析1:SIFT简介
- mahout实现的模糊K-Means聚类算法原理和源码分析
- 主引导记录工作原理(源码分析很COOL)
- HashMap实现原理及源码分析
- Hadoop之HDFS原理及文件上传下载源码分析(下)
- ConcurrentHashMap实现原理及源码分析
- springmvc源码分析原理及简单实现
- u-boot源码配置原理分析
- 【Cocos2d-x 3.x】 场景切换生命周期、背景音乐播放和场景切换原理与源码分析