您的位置:首页 > 其它

RocketMQ client客户端模块源码分析一(生产者)

2016-05-27 10:23 561 查看

类结构图



该图只列出了核心的接口和一些关键的实现类,简化了结构图,让我们对client模块有个整体的认识。从图中我们可以看出以下关键点。

客户端包的核心角色是MQ管理者、MQ消息生产者、MQ消息消费者。这与它的功能相互吻合。

客户端包的设计应用了门面模式。对外针对开发者有一套简单的api,对内有内部的接口和实现,这些是不会暴露给开发者的,因此开发者再使用的时候请不要面向impl包及其下面的实现类进行编程,而应该面向抽象编程,否则当客户端包升级的时候,我们就会很尴尬了。

生产者和消费者有公共的职责和实现,他们通过组合都使用了MQClientApiImpl这个类实现了与nameserver和broker的远程通讯。

生产者源码分析

生产者角色的实现相对简单一些,让我们先来看看RocketMQ的客户端中的生产者源码是什么样的。

使用案例

消息生产者使用方法如下面代码所示:

package com.sean;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {
public static void main(String[] args){
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("192.168.58.163:9876");
try {
producer.start();

Message msg = new Message("PushTopic",
"push",
"1",
"Just for test.".getBytes());

SendResult result = producer.send(msg);
System.out.println("id:" + result.getMsgId() +
" result:" + result.getSendStatus());

msg = new Message("PushTopic",
"push",
"2",
"Just for test.".getBytes());

result = producer.send(msg);
System.out.println("id:" + result.getMsgId() +
" result:" + result.getSendStatus());

msg = new Message("PullTopic",
"pull",
"1",
"Just for test.".getBytes());

result = producer.send(msg);
System.out.println("id:" + result.getMsgId() +
" result:" + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}finally{
producer.shutdown();
}
}
}

可以看出来先构造一个生产者对象,设置必要的配置信息包括nameServer服务器地址namesrvAddr,生产者集群名称producerGroup(一般是应用的名称)即可,然后调用api发送消息到broker上去了。

核心源码分析

发送消息核心源码实现位于类DefaultMQProducer中,它的核心方法send又委托给了内部实现类DefaultMQProducerImpl去实现了,因此我进入到DefaultMQProducerImpl的sendi方法中一探究竟。

send方法最终委托给了一个内部方法sendDefaultImpl,关键是这个方法的源码实现,我们一起看看这个源码。

private SendResult sendDefaultImpl(//
Message msg,//
final CommunicationMode communicationMode,//
final SendCallback sendCallback, final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK(); //检查服务状态
Validators.checkMessage(msg, this.defaultMQProducer);//检查消息合法性

final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000; //发送超时时间。
final long beginTimestamp = System.currentTimeMillis(); //发送开始时间。
long endTimestamp = beginTimestamp;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//获得topic对应的发布信息。
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed(); //重试次数,重试几次避免网络引起的不可用。
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) {//尝试发送设置的次数,若已经超时了则不再发送。
String lastBrokerName = null == mq ? null : mq.getBrokerName();//上次尝试发送使用的broker。
MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);//选择一个borker,一般会避免使用上次尝试过的broker。
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();//记录发送过的broker列表。
try {
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);//调用该方法真正发送。
endTimestamp = System.currentTimeMillis();//记录发送耗时。
switch (communicationMode) {//根据通讯模式来针对发送结果做处理。
case ASYNC:
return null;//异步发送立即返回。
case ONEWAY:
return null;//单向通讯也立即返回。
case SYNC: //同步发送则如果未成功则重试,否则直接返回发送结果。
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}

return sendResult;
default:
break;
}
}
catch (RemotingException e) {
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
exception = e;
endTimestamp = System.currentTimeMillis();
continue;
}
catch (MQClientException e) {
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
exception = e;
endTimestamp = System.currentTimeMillis();
continue;
}
catch (MQBrokerException e) {
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
exception = e;
endTimestamp = System.currentTimeMillis();
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}

throw e;
}
}
catch (InterruptedException e) {
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
}
else {
break;
}
} // end of for

if (sendResult != null) {
return sendResult; //返回发送结果。
}

String info =
String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", //
times, //
(System.currentTimeMillis() - beginTimestamp), //
msg.getTopic(),//
Arrays.toString(brokersSent)); //构造错误信息。

info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

throw new MQClientException(info, exception); //发送失败抛出异常,则发送应用要做好相应的业务处理。
}

List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException("No name server address, please set it."
+ FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null);
}//获得队注册服务器失败异常。

throw new MQClientException("No route info of this topic, " + msg.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null); //没有对应的路由信息异常。
}


从代码我们可以发现发送消息有这些特点。

发送消息支持失败重试,还支持超时退出抛异常的可能性。这主要都是为了提升可用性和容错性。

发送消息支持异步、同步和单向等三种通讯模型。重试次数只对同步模式有效。

发送可能导致发送重复消息,因为发送失败有可能不是真的失败,服务器可能已经存储了消息,因此消费要支持幂等性。

发送应用要做好发送失败的异常处理,避免丢失消息。

发送消息选择broker和queueId的时候是轮询的,这个路由策略可能需要支持更多扩展,比如就近发送。

该方法又调用了方法sendKernelImpl发送消息,再进入该方法看看。

private SendResult sendKernelImpl(final Message msg,//
final MessageQueue mq,//
final CommunicationMode communicationMode,//
final SendCallback sendCallback,//
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); //获得broker服务器的地址。
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());//如果地址为空,则再更新一次,。
}

SendMessageContext context = null;
if (brokerAddr != null) { //地址不为空,则继续发送消息。
byte[] prevBody = msg.getBody();
try {
int sysFlag = 0;
if (this.tryToCompressMessage(msg)) { //根据压缩配置执行消息压缩。
sysFlag |= MessageSysFlag.CompressedFlag; //压缩标记会记录在消息附件信息中,接收到消息根据标记解压
}

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); //事务标志。
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TransactionPreparedType;
}

if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}

if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
this.executeSendMessageHookBefore(context);
}

//组装发送消息的请求头。

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
//重试消息特殊处理。
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(new Integer(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
}

//调用客户端api发送消息。
SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
brokerAddr,// 1
mq.getBrokerName(),// 2
msg,// 3
requestHeader,// 4
timeout,// 5
communicationMode,// 6
sendCallback// 7
);

if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}

return sendResult;
}
catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
}
catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
}
catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
}
finally {
msg.setBody(prevBody);
}
}

该方法的职责是组装发送消息的格式,更加底层的一些消息发送协议处理,它最终又调用了this.mQClientFactory.getMQClientAPIImpl().sendMessage发送消息,我们继续最终它。

public SendResult sendMessage(//
final String addr,// 1
final String brokerName,// 2
final Message msg,// 3
final SendMessageRequestHeader requestHeader,// 4
final long timeoutMillis,// 5
final CommunicationMode communicationMode,// 6
final SendCallback sendCallback// 7
) throws RemotingException, MQBrokerException, InterruptedException {
if (!UtilAll.isBlank(projectGroupPrefix)) {
msg.setTopic(VirtualEnvUtil.buildWithProjectGroup(msg.getTopic(), projectGroupPrefix));
requestHeader.setProducerGroup(VirtualEnvUtil.buildWithProjectGroup(
requestHeader.getProducerGroup(), projectGroupPrefix));
requestHeader.setTopic(VirtualEnvUtil.buildWithProjectGroup(requestHeader.getTopic(),
projectGroupPrefix));
}
//转化为底层的远程通讯命令对象,这是底层通讯协议。
RemotingCommand request = null;
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 =
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
}
else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}

request.setBody(msg.getBody());

//三种通讯模型,调用对应的方法处理。
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback);
return null;
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
default:
assert false;
break;
}

return null;
}

该方法最终又调用了rocketmq-remoting模块的远程通讯方法,通过该方法发送请求消息给broker服务器,根据地址找到缓存在本地的可用连接,这些连接缓存达到复用的目的,详细的内容参考该模块的源码分析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息