RocketMQ client客户端模块源码分析一(生产者)
2016-05-27 10:23
561 查看
类结构图
该图只列出了核心的接口和一些关键的实现类,简化了结构图,让我们对client模块有个整体的认识。从图中我们可以看出以下关键点。
客户端包的核心角色是MQ管理者、MQ消息生产者、MQ消息消费者。这与它的功能相互吻合。
客户端包的设计应用了门面模式。对外针对开发者有一套简单的api,对内有内部的接口和实现,这些是不会暴露给开发者的,因此开发者再使用的时候请不要面向impl包及其下面的实现类进行编程,而应该面向抽象编程,否则当客户端包升级的时候,我们就会很尴尬了。
生产者和消费者有公共的职责和实现,他们通过组合都使用了MQClientApiImpl这个类实现了与nameserver和broker的远程通讯。
生产者源码分析
生产者角色的实现相对简单一些,让我们先来看看RocketMQ的客户端中的生产者源码是什么样的。使用案例
消息生产者使用方法如下面代码所示:package com.sean; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args){ DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("192.168.58.163:9876"); try { producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PullTopic", "pull", "1", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); }finally{ producer.shutdown(); } } }
可以看出来先构造一个生产者对象,设置必要的配置信息包括nameServer服务器地址namesrvAddr,生产者集群名称producerGroup(一般是应用的名称)即可,然后调用api发送消息到broker上去了。
核心源码分析
发送消息核心源码实现位于类DefaultMQProducer中,它的核心方法send又委托给了内部实现类DefaultMQProducerImpl去实现了,因此我进入到DefaultMQProducerImpl的sendi方法中一探究竟。send方法最终委托给了一个内部方法sendDefaultImpl,关键是这个方法的源码实现,我们一起看看这个源码。
private SendResult sendDefaultImpl(// Message msg,// final CommunicationMode communicationMode,// final SendCallback sendCallback, final long timeout// ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); //检查服务状态 Validators.checkMessage(msg, this.defaultMQProducer);//检查消息合法性 final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000; //发送超时时间。 final long beginTimestamp = System.currentTimeMillis(); //发送开始时间。 long endTimestamp = beginTimestamp; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//获得topic对应的发布信息。 if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed(); //重试次数,重试几次避免网络引起的不可用。 int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) {//尝试发送设置的次数,若已经超时了则不再发送。 String lastBrokerName = null == mq ? null : mq.getBrokerName();//上次尝试发送使用的broker。 MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);//选择一个borker,一般会避免使用上次尝试过的broker。 if (tmpmq != null) { mq = tmpmq; brokersSent[times] = mq.getBrokerName();//记录发送过的broker列表。 try { sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);//调用该方法真正发送。 endTimestamp = System.currentTimeMillis();//记录发送耗时。 switch (communicationMode) {//根据通讯模式来针对发送结果做处理。 case ASYNC: return null;//异步发送立即返回。 case ONEWAY: return null;//单向通讯也立即返回。 case SYNC: //同步发送则如果未成功则重试,否则直接返回发送结果。 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); exception = e; endTimestamp = System.currentTimeMillis(); continue; } catch (MQClientException e) { log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); exception = e; endTimestamp = System.currentTimeMillis(); continue; } catch (MQBrokerException e) { log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); exception = e; endTimestamp = System.currentTimeMillis(); switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } // end of for if (sendResult != null) { return sendResult; //返回发送结果。 } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", // times, // (System.currentTimeMillis() - beginTimestamp), // msg.getTopic(),// Arrays.toString(brokersSent)); //构造错误信息。 info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); throw new MQClientException(info, exception); //发送失败抛出异常,则发送应用要做好相应的业务处理。 } List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null); }//获得队注册服务器失败异常。 throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null); //没有对应的路由信息异常。 }
从代码我们可以发现发送消息有这些特点。
发送消息支持失败重试,还支持超时退出抛异常的可能性。这主要都是为了提升可用性和容错性。
发送消息支持异步、同步和单向等三种通讯模型。重试次数只对同步模式有效。
发送可能导致发送重复消息,因为发送失败有可能不是真的失败,服务器可能已经存储了消息,因此消费要支持幂等性。
发送应用要做好发送失败的异常处理,避免丢失消息。
发送消息选择broker和queueId的时候是轮询的,这个路由策略可能需要支持更多扩展,比如就近发送。
该方法又调用了方法sendKernelImpl发送消息,再进入该方法看看。
private SendResult sendKernelImpl(final Message msg,// final MessageQueue mq,// final CommunicationMode communicationMode,// final SendCallback sendCallback,// final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); //获得broker服务器的地址。 if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());//如果地址为空,则再更新一次,。 } SendMessageContext context = null; if (brokerAddr != null) { //地址不为空,则继续发送消息。 byte[] prevBody = msg.getBody(); try { int sysFlag = 0; if (this.tryToCompressMessage(msg)) { //根据压缩配置执行消息压缩。 sysFlag |= MessageSysFlag.CompressedFlag; //压缩标记会记录在消息附件信息中,接收到消息根据标记解压 } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); //事务标志。 if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TransactionPreparedType; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); this.executeSendMessageHookBefore(context); } //组装发送消息的请求头。 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); //重试消息特殊处理。 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(new Integer(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } } //调用客户端api发送消息。 SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// brokerAddr,// 1 mq.getBrokerName(),// 2 msg,// 3 requestHeader,// 4 timeout,// 5 communicationMode,// 6 sendCallback// 7 ); if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); } }
该方法的职责是组装发送消息的格式,更加底层的一些消息发送协议处理,它最终又调用了this.mQClientFactory.getMQClientAPIImpl().sendMessage发送消息,我们继续最终它。
public SendResult sendMessage(// final String addr,// 1 final String brokerName,// 2 final Message msg,// 3 final SendMessageRequestHeader requestHeader,// 4 final long timeoutMillis,// 5 final CommunicationMode communicationMode,// 6 final SendCallback sendCallback// 7 ) throws RemotingException, MQBrokerException, InterruptedException { if (!UtilAll.isBlank(projectGroupPrefix)) { msg.setTopic(VirtualEnvUtil.buildWithProjectGroup(msg.getTopic(), projectGroupPrefix)); requestHeader.setProducerGroup(VirtualEnvUtil.buildWithProjectGroup( requestHeader.getProducerGroup(), projectGroupPrefix)); requestHeader.setTopic(VirtualEnvUtil.buildWithProjectGroup(requestHeader.getTopic(), projectGroupPrefix)); } //转化为底层的远程通讯命令对象,这是底层通讯协议。 RemotingCommand request = null; if (sendSmartMsg) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } request.setBody(msg.getBody()); //三种通讯模型,调用对应的方法处理。 switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback); return null; case SYNC: return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request); default: assert false; break; } return null; }
该方法最终又调用了rocketmq-remoting模块的远程通讯方法,通过该方法发送请求消息给broker服务器,根据地址找到缓存在本地的可用连接,这些连接缓存达到复用的目的,详细的内容参考该模块的源码分析。
相关文章推荐
- java结合WebSphere MQ实现接收队列文件功能
- rocketmq的安装(简单版)
- RocketMQ Filtersrv详解
- JMS-使用消息队列优化网站性能
- 架构优化 - 应用,MQ Broker,业务处理分层
- rocketmq命令行自动补全工具
- IBM WebSphere MQ介绍安装以及配置服务详解
- Websphere MQ 7.0.0 For Linux版安装
- Fourinone四合一分布式计算框架整体介绍
- IBM MQ常用命令
- AMQP协议
- IBM-MQ安装及使用实例
- 了解MSMQ
- IBM MQ Monitor
- 【原创】一种实现IBM MQ通道传输能力垂直扩展的方法 - An Approach for Scaling Up/Down IBM MQ Channel Throughput
- 【原创】IBM MQ触发条件解读 IBM MQ Triggering Conditions Interpretation
- IBM MQ 7.5 域环境配置问题处理
- IBM-MQ安装及使用实例
- MQ消息例子
- Active MQ 使用