您的位置:首页 > 其它

rocketMq-Producer原理源码分析

2018-05-06 13:53 513 查看

producer的源码结构如下:


 我们通常使用mq发送消息,实例化producer的方式就是:

DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
producer.setNamesrvAddr(namesrv);
producer.start();
producer.send(msg);
[p] 所以就从DefaultMQProducer 开始说起吧:一、DefaultMQProducer继承ClientConfig类同时实现了MQProducer接口,同时包含一个重要的属性
public class DefaultMQProducer extends ClientConfig implements MQProducer {
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
DefaultMQProducer作为rocketmq生产者的默认实现,其实它并没有做任何实现,其内部引用一个DefaultMQProducerImpl实例进行具体消息发送。它有一些基础配置,比如多长时间内消息发送多少次还是没成功则放弃(默认为4秒内发送3次,每次发消息默认超时间为3秒)

  • 重要字段 
    1 String producerGroup 生产者的组名。 
    一个jvm内,具有相同producerGroup名字的生产者实例只有一个。 
    2 retryAnotherBrokerWhenNotStoreOK 
    消息没有存储成功是否发送到另外一个broker. 
    3 sendMsgTimeout 
    发送消息超时时间,默认为3秒
  • 重要方法 
    send(Message msg) 
    发送消息,调用DefaultMQProducerImpl .send()发送
        ClientConfig类就是客户端的公共配置参数说明:
amesrvAddr   //namesrv地址列表,多个地址列表用分号隔开
clientIP   //本机ip
instanceName  //客户端实例名称,客户端创建的多个producer,consumer实际上共用的一个内部实例(包含网络数据和线程资源)
clientCallbackExecutorThreads   //通信层异步回调线程数
pollNameServerInterval   //轮询namesrv时间间隔  默认30秒
heartbeatBrokerInterval   //向broker发送心跳时间间隔  默认30秒
persistConsumerOffsetInterval   //持久化consumer消费进度的时间间隔
包括我们的producer.setNamesrvAddr(namesrv);其实就是调用了ClientConfig.setNamesrvAddr(namesrv);    当然,里面还包括了一些不常用,但也很重要的方法,比如[/p]
ClientConfig.resetClientConfig(final ClientConfig cc)      //自定义一个客户端配置
ClientConfig.cloneClientConfig()      //克隆一个已有的客户端配置

至于MQProducer接口就不用说了,定义了一组接口,例如      start();   //启动      producersend (final Message msg, final SendCallback sendCallback, final long timeout) ;   //发送消息      .........DefaultMQProducerImpl:非常重要的一个方法,start(),也就是producer的启动方法,该方法的大致流程如下:


public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:                                     1、
this.serviceState = ServiceState.START_FAILED;    2、
this.checkConfig();                               3、
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();   4、
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);   5、
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);             6、
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());                 7、
if (startFactory) {
mQClientFactory.start();               8、
}
this.serviceState = ServiceState.RUNNING;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();  9、

1、检查DefaultMQProducerImpl.ServiceState的状态(初始化状态为ServiceState.CREATE_JUST);只有状态为CREATE_JUST时才启动该Producer;其他状态均不执行启动过程;2、将DefaultMQProducerImpl.ServiceState置为start_failed,以免客户端同一个进程中重复启动;3、检查producerGroup是否合法:不能为空、不能有非法字符、长度不能大于255、不能等于"DEFAULT_PRODUCER";若不合法则直接向应用层抛出MQClientException异常;若producerGroup不等于"CLIENT_INNER_PRODUCER"则设置Producer的实例名(instanceName);调用java的ManagementFactory.getRuntimeMXBean()方法获取该进程的PID作为该Producer的实例名(instanceName);4、this.defaultMQProducer.changeInstanceNameToPID();   若producerGroup不等于"CLIENT_INNER_PRODUCER"则设置Producer的实例名(instanceName);调用java的ManagementFactory.getRuntimeMXBean()方法获取该进程的PID作为该Producer的实例名(instanceName);
5、创建一个客户端实例

String clientId = clientConfig.buildMQClientId();     //构建该Producer的ClientID,等于IP地址@instanceName;
instance = new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
[p]创建MQClientInstance对象。先检查单例对象MQClientManager的factoryTable:ConcurrentHashMap<String/* clientId */, MQClientInstance>变量中是否存在该ClientID的对象,若存在则直接返回该MQClientInstance对象,若不存在,则创建MQClientInstance对象,并以该ClientID为key值将新创建的MQClientInstance对象存入并返回,将返回的MQClientInstance对象赋值给DefaultMQProducerImpl.mQClientFactory变量;说明一个IP客户端下面的应用,只有在启动多个进程的情况下才会创建多个MQClientInstance对象;在初始化MQClientInstance对象的过程中,会做如下操作:
this.clientRemotingProcessor = new ClientRemotingProcessor(this);   5.1、

this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);    5.2、

if (this.clientConfig.getNamesrvAddr() != null) {                   5.3、
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.pullMessageService = new PullMessageService(this);                  5.4、
this.rebalanceService = new RebalanceService(this);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);  5.5、
 5.1)初始化ClientRemotingProcessor对象,处理接受的事件请求;   5.2)初始化MQClientAPIImpl对象,在初始化过程中,初始化了MQClientAPIImpl.remotingClient:NettyRemotingClient对象,将ClientRemotingProcessor对象作为事件处理器注册到NettyRemotingClient对象中,处理的事件号有:CHECK_TRANSACTION_STATE、NOTIFY_CONSUMER_IDS_CHANGED、RESET_CONSUMER_CLIENT_OFFSET、GET_CONSUMER_STATUS_FROM_CLIENT、GET_CONSUMER_RUNNING_INFO、CONSUME_MESSAGE_DIRECTLY。    5.3)若ClientConfig.namesrvAddr不为空,则拆分成数组存入MQClientAPIImpl.remotingClient变量中;    5.4)初始化PullMessageService、RebalanceService、ConsumerStatsManager服务线程;PullMessageService服务线程是供DefaultMQPushConsumer端使用的,RebalanceService服务线程是供Consumser端使用的;    5.5)初始化producerGroup等于"CLIENT_INNER_PRODUCER"的DefaultMQProducer对象; 6、将DefaultMQProducerImpl对象在MQClientInstance中注册,以producerGroup为key值、DefaultMQProducerImpl对象为values值存入MQClientInstance.producerTable:ConcurrentHashMap<String/* group */, MQProducerInner>变量中,若在该变量中已存在该producerGroup的记录则向应用层抛出MQClientException异常;说明在一个客户端的一个进程下面启动多个Producer时producerGroup名字不能一样,否则无法启动;7、以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中;(正常情况下使用命令在集群上建立topic变回保存在这里。)8、mQClientFactory.start();用MQClientInstance.start方法启动MQClientInstance对象;


public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();    8.1、
}
this.mQClientAPIImpl.start();        8.2、
this.startScheduledTask();           8.3、
this.serviceState = ServiceState.RUNNING   8.4、
this.pullMessageService.start();      8.5、
this.rebalanceService.start();      8.6、
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
 8.1)关于namesrvAddr的设置一般在两个地方,还有一个是在环境变量里设置.如果这两个地方都没有设置会走该方法.一个是在创建producer时候设置
producer.setNamesrvAddr(MyUtils.getNamesrvAddr()) 
另一个是在环境变量里设置而
fetchNameServerAddr()
是第三种调用http接口去寻址.需配置hosts信息,客户端默认每隔两分钟去访问一次这个http地址,并更新本地namesrvAddr地址.         8.2)客户端netty启动。         8.3)this.startScheduledTask();启动各种任务调度                 8.3.1)从NameSrv遍历TopicRouteInfo(Topic的路由信息有brokerName,queueId组成),然后更新producer和consumer的topic信息     【30秒一次】
              8.3.2)清理离线的broker        【30秒一次】              8.3.3)向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息    【30秒一次】              8.3.4)持久化consumer消费进度    【5秒一次】              8.3.5)启动线程池线程数调整线程。   【每分钟调整一次】              8.3.6)this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                   8.3.8)设置DefaultMQProducerImpl的ServiceState为RUNNING,使producer避免重复启动;         8.4)设置DefaultMQProducerImpl的ServiceState为RUNNING,使producer避免重复启动;         8.5)启动拉消息服务PullMessageService。         8.6)启动消费端负载均衡服务RebalanceService9、调用MQClientInstance.sendHeartbeatToAllBrokerWithLock()方法,向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息(consumer中有详细描述)。[/p]

此,简单总结一下producer的启动过程,只对状态为CREATE_JUST的DefaultMQProducerImpl执行启动过程,首先检查producerGroup合法性,构建producer的instanceName为进程号,构建producer的clientId为ip@instanceName,  创建MQClientInstance对象,期间所做的事为5.1-5.5,不再赘述。向MQClientInstance中注册DefaultMQProducerImpl,给topicPublishInfoTable添加一个初始值。用MQClientInstance.start方法启动MQClientInstance对象,期间所做的事为8.1-8.6,向Broker发送心跳消息。大概如下流程图:




producer启动以后自然就是向broker发送消息了,下面看看producer向broker发送消息的具体流程  rocketMq支持producer以三种方式发送消息到broker:  可靠同步发送:

SendResult sendResult = producer.send(msg);
可靠异步发送:
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) { .......}
@Override
public void onException(Throwable e) {..........}
});
单向传输:
producer.sendOneway(msg);
无论哪一种发送方式,只要我们没有明确指明超时时间,那就会使用默认的超时时间(30秒)private int sendMsgTimeout = 3000;下面我们以可靠同步发送为例:producer.send(msg);实际调用的是DefaultMQProducerImpl.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);   在该方法中会执行以下操作(省略了非关键步骤)
this.makeSureStateOK();      1、

Validators.checkMessage(msg, this.defaultMQProducer);    2、

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());    3、

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;    4、

for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);       5、
if (mqSelected != null) {
mq = mqSelected;
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);   6、
}
}
1、检查producer是否成功启动,this.serviceState = ServiceState.RUNNING代表producer已经成功启动,否则抛出异常throw new MQClientException("The producer service state not OK,.....")(我们见过这个异常)    2、参数检查,Message的长度大小,topic的非法字符,判空,长度等进行判断。   3、根据传入的topic从DefaultMQProducerImpl的ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable获取指定的TopicPublishInfo。       2.1)若没有获取到对应的TopicPublishInfo,就是我们实际情况中没有在集群上建立topic就往这个topic上发消息,那么此时不会报错,会建立该topic对应的TopicPublishInfo,然后返回,即topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());但是这会导致两个问题1、 http://www.mamicode.com/info-detail-327693.html   2、此时的topic是随机建立在一个broker上的,以后该producer的消息都会发送到这个broker上,就做不到负载均衡了。因此建议配置autoCreateTopicEnable=false,这样如果发送时的topic没有建立就会报错。        2.2)若有对应的TopicPublishInfo,则返回该topic对应的TopicPublishInfo。   4、根据发送方式设置失败重发次数,sync=3,async=1,sendOneWay=1。   5、选择一个发送队列。int pos = Math.abs(index++) % this.messageQueueList.size();每次获取queue都会通过sendWhichQueue加一来实现对所有queue的轮询,如果入参lastBrokerName不为空,代表上次选择的queue发送失败,这次选择应该避开同一个queue【范围是仅该条消息】。   6、进行真正的消息发送!
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());               6.1、
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);    6.2、

byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}

if (hasCheckForbiddenHook()) {                                                          6.3、
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}

if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}

if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();                  6.4、
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                      6.5、
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}

SendResult sendResult = null;
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                6.6、
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
return sendResult;
finally {
msg.setBody(prevBody);
}
}
6.1)String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());     获取broker地址,其中比较细节的是如果没获取到地址,会调用tryToFindTopicPublishInfo(mq.getTopic());从namesrv更新topic路由后再次获取。       6.2)此时获取的地址形如10.3.253.227:10911,rocketMq默认开启VipChannel所以发送的时候是发送到10.3.253.227:10909.新的addr是调用brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);得到的。这里能解释一个问题,当时测试的时候往pim的mq发消息,就是报错,跟10911相关的,默认开启VipChannel,所以发送到了10909上,所以需要手动加上一行代码producer.setVipChannelEnabled(false);   6.3)依次检查producer客户端是否设置了ForbiddenContext和SendMessageHook,如果有则执行对应方法(个人感觉这个类似于切面):this.executeCheckForbiddenHook(checkForbiddenContext);      this.executeSendMessageHookBefore(context);       6.4)之后根据之前得到的一系列发送消息的配置,来构造发送给Broker的请求头数据。       6.5)如果请求头的topic是以%RETRY%开头的,就再给请求头额外配置重新消费次数和最大重新消费次数                      String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);                      requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));                      String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);                      requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));        6.6)
request.setBody(msg.getBody());              6.6.1、
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);    6.6.2、

this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {                 6.6.3
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);    6.6.5
}

});

this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);               6.6.4、

channel.writeAndFlush(request).addListener(new ChannelFutureListener() {            //netty相关
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
});
 6.6.1)构造request的code和body(实际发送的消息内容)【Byte类型】                6.6.2)调用SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request)  方法                6.6.3)进而调用RemotingClient.invokeSync(addr, request, timeoutMillis);                6.6.4)进而调用NettyRemotingClient.invokeSyncImpl(channel, request, timeoutMillis)给broker发送消息,实际是netty间的通信channel.writeAndFlush(request),并且在超时时间内等待返回结果                6.6.5)根据服务端的响应结果,调用processSendResponse(brokerName, msg, response);组装SendResult。        最后将SendResult层层返回,就回到了最初的样子  SendResult sendResult = producer.send(msg);        其中有一个问题还没理解,使用Netty传输POJO对象,重点在于对象的序列化。序列化的对象通过TCP进行网络传输,结合Netty提供的对象编解码器,可以做到远程传输对象。首先Java需要序列化的对象,需要实现java.io.Serializable接口。可以看到Message对象是实现该接口了的。但是发送消息时实际发送的对象是RemotingCommand,但是他并没有序列化。这是为什么呢?
最后简单总结一下producer发送消息的过程(以sync为例):检查producer状态,检查参数,获取topic的路由信息,选择发送队列,获取broker的地址,配置request请求头,给request添加code值和body的内容,使用netty将从NettyRemotingClient【netty服客户端】消息传输到broker(NettyRemotingServer【netty服务端】),等待响应结果,组装SendResult 并返回。大致如下流程:






阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: