RocketMQ源码 — 三、 Producer消息发送过程
2018-02-28 09:06
691 查看
Producer
消息发送producer start
producer启动过程如下图public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // check GroupName this.checkConfig(); // 改变ClientConfig.instanceName为pid if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } // 初始化mQClientFactory为MQClientInstance,并将该实例加入factoryTable this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 将producer注册到MQClientInstance.producerTbale boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 保存topic对应的routeInfo this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { // 启动MQClientInstance mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, "// + this.serviceState// + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } // 启动的时候向所有的broker发送heartbeat this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }
消息发送流程
消息发送过程:先由producer封装通过netty发送到broker,然后由broker进行保存,过程如下============================= producer发送消息 =============================
主要逻辑在DefaultMQProducerImpl.sendDefaultImpl
获取topicRouteInfo
DefaultMQProducerImpl.tryToFindTopicPublishInfo
->
MQClientInstance.updateTopicRouteInfoFromNameServer
private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable:放置所有的producer updateTopicRouteInfoFromNameServer - 获取topicRouteInfo - 然后遍历producerTable,更新所有producer的topicRouteInfo - 遍历consumerTable,更新所有的consumer的topicSubscribeInfo - 加入topicRouteTable
->
MQClientAPIImpl.getTopicRouteInfoFromNameServer
通过netty向nameServer发出请求,请求topicRouteInfo
选取一个messageQueue
从messageQueueList取一个MessageQueue
netty发送消息
DefaultMQProducer.sendKernelImpl,
============================= broker接收消息 =============================
因为使用netty作为网络通信工具,broker也是先使用netty接收到信息,然后调用注册的processor处理
parseRequestHeader
利用反射构造requestHeader
构造SendMessageContext
执行beforeHook
发送消息
构造MessageExtBrokerInner
保存消息DefaultMessageStore.putMessage
CommitLog.putMessage
从mapedFileQueue中取出一个mapedFile,appendMessag使用directBuffer的方式写入commitLog
同步或者异步刷盘
同步双写
执行afterHook
issue
broker怎么会使用SendMessageProcessor来处理producer发来的消息?
// 在broker初始化的时候会注册所有的processor,registerProcessor SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList) sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
broker怎么接收消息并写入文件
找到需要写入的mapedFile(最后一个,或者新建一个)使用mapedFile.appendMessage
怎么获取需要写入的mapedFile
见时序图:RocketMQ.asta(Broker收到sendRequest)mapedFile.appendMessage的过程
见时序图:RocketMQ.asta(Broker收到sendRequest)消息发送过程中涉及到的类
DefaultMQProducerImpl
topicPublishInfoTable
里面存放topic对应的messageQueue等信息// topic对应的消息是否有序 private boolean orderTopic = false; // 有没有routerInfo private boolean haveTopicRouterInfo = false; // topic对应的messageQueue private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); // 消息发往哪一个queue private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); private TopicRouteData topicRouteData;
private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
put:
在producer start的时候会根据当前producer 的topic新建一个TopicPushlishInfo放进去
在发送信息之前会获取topic对应的topicPublishInfo,这个时候会去nameServer 查询最新的信息并更新table中所有的记录
defaultMQProducer
private final DefaultMQProducer defaultMQProducer;
发送消息的类,在DefaultMQProducer的构造函数里new DefaultMQProducerImpl传入DefaultMQProducer自身,所以这里DefaultMQProducerImpl.defaultMQProducer默认就是DefaultMQProducer
mQClientFactory
private MQClientInstance mQClientFactory;
MQClientManager是单例,包含两个属性
// 用来生成MQClientInstance的id,每个递增 private AtomicInteger factoryIndexGenerator = new AtomicInteger(); // 包含clientId到MQClientInstance的映射,管理client private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();
通过getInstance获取实例之后调用getAndCreateMQClientInstance创建MQClientInstance。
在DefaultMQProducerImpl.start的时候调用mQClientFactory.start启动
MQClientInstance
字段
// 在MQClientManager中new MQCLientInstance的时候,传入DefaultMQProducer,因为DefaultMQProducer继承了ClientConfig private final ClientConfig clientConfig; // new 的时候传入,由MQClientManager生成 private final int instanceIndex; // new的时候传入,由clientConfig.buildMQClientId生成,形式为:ip@instanceName private final String clientId; // 每个group对应的MQProducerInner // 在producer启动的时候注册到这儿 // 在每次发送message的时候从nameServer获取topicRouteData并更新每个producer对应的信息 private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); // 每个group对应的MQConsumerInner // consumer start的时候注册到这儿 // 在每次发送message的时候从nameServer获取topicRouteData并更新每个consumer对应的信息 private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); // 每个group对应的adminExtInner,在NameServer 启动的时候会注册DefaultMQAdminExt private final ConcurrentHashMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>(); // topic对应的TopicRouteTable // 在每次发送message的时候从nameServer获取topicRouteData并更新对应的信息 private final ConcurrentHashMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>(); // 所有broker的地址 // 在每次发送message的时候从nameServer获取topicRouteData并更新对应的信息 private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>(); // 定时任务线程池,包括:fetchNameServerAddr,updateTopicRouteInfoFromNameServer等 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "MQClientFactoryScheduledThread"); } }); // service private final PullMessageService pullMessageService; private final RebalanceService rebalanceService; private final NettyClientConfig nettyClientConfig; // 用来client远程通信,使用netty private final MQClientAPIImpl mQClientAPIImpl;
关键方法
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr()); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } } private void startScheduledTask() { if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 获取nameServer的地址 MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 更新topicRouteData MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 清理掉线的broker MQClientInstance.this.cleanOfflineBroker(); // 给broker发送心跳 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 保存consumerOffset MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 根据processQueueTable的大小决定是否需要增加或者减少threadPool的大小 // 目前尚未实现具体的增加或者减少的逻辑 MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); }
DefaultMessageStore
字段
private final MessageStoreConfig messageStoreConfig; // CommitLog private final CommitLog commitLog; // topic和queueId唯一确定一个consumeQueue // put: // 1. 在getMessage的时候会调用findConsumeQueue,如果consumeQueueTable不存在对应的(topic,queueId),则新建一个加入table // 2. 在启动的时候load private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; /** service */ // 消息刷盘,依次循环consumeQueueTable,将每个consumeQueue中的mapedFileQueue commit private final FlushConsumeQueueService flushConsumeQueueService; // 删除过时的commitlog文件 private final CleanCommitLogService cleanCommitLogService; // 删除consumeQueue文件 private final CleanConsumeQueueService cleanConsumeQueueService; // 在新建DefaultMessageStore的时候新建new // 在messageStore start的时候start // 在run方法中循环取出requestQueue(PriorityBlokingQueue,take的时候是阻塞的)里面的request处理——即新建mapedFile private final AllocateMapedFileService allocateMapedFileService; // TODO 暂时还不能理解indexservice private final IndexService indexService; // TODO private final ReputMessageService reputMessageService; // HA 高可用,同步双写,异步复制 private final HAService haService; private final ScheduleMessageService scheduleMessageService; private final StoreStatsService storeStatsService;
关键方法
public void start() throws Exception { this.flushConsumeQueueService.start(); this.commitLog.start(); this.storeStatsService.start(); if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { this.scheduleMessageService.start(); } if (this.getMessageStoreConfig().isDuplicationEnable()) { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); } else { this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); } this.reputMessageService.start(); this.haService.start(); this.createTempFile(); this.addScheduleTask(); this.shutdown = false; }
CommitLog
消息写入内存,保存文件的地方字段
// 消息文件队列,包含所有保存在磁盘上的文件 private final MapedFileQueue mapedFileQueue; // private final DefaultMessageStore defaultMessageStore; // 消息刷盘 private final FlushCommitLogService flushCommitLogService; // 添加消息的回调,在doAppend方法中追加消息到内存 private final AppendMessageCallback appendMessageCallback; // 记录topic对应每个队列的offset private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
MapedFileQueue
管理mapedFile,新建、获取、删除mapedFile将消息写入文件字段
// 每次删除文件个数,作为是否删除的一个参数 private static final int DeleteFilesBatchMax = 10; // 文件的存储路径 private final String storePath; // 每个文件的大小 private final int mapedFileSize; // 所有文件列表 private final List<MapedFile> mapedFiles = new ArrayList<MapedFile>(); // 读锁 private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 新建mapedFile的地方,将新建mapedFile的 request添加到requesttable和requestQueue中 // 在MessageStore启动的时候会启动AllocateMapedFileService这个线程,执行requestQueue里面的request,新建mapedFile private final AllocateMapedFileService allocateMapedFileService;
MapedFile
和文件一对一关系字段
// 保存消息的文件名 private final String fileName; // 该文件的全局offset,也就是文件名的前缀 private final long fileFromOffset; // 文件大小 private final int fileSize; // 文件对象 private final File file; // 文件映射为的内存 private final MappedByteBuffer mappedByteBuffer; // 文件写的位置 private final AtomicInteger wrotePostion = new AtomicInteger(0); // 刷盘之后的位置 private final AtomicInteger committedPosition = new AtomicInteger(0); // nio阻塞 private FileChannel fileChannel; private volatile long storeTimestamp = 0; private boolean firstCreateInQueue = false;
方法
public int commit(final int flushLeastPages) { // 判断是否需要flush if (this.isAbleToFlush(flushLeastPages)) { //判断文件是否被占用,也就是说每次commit的时候不一定成功 if (this.hold()) { // 如果没有被占用则将内存刷到磁盘上 int value = this.wrotePostion.get(); // 将内存强制写入磁盘 this.mappedByteBuffer.force(); this.committedPosition.set(value); // 释放占用 this.release(); } else { // 尝试占用失败,重置committedPosition log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); this.committedPosition.set(this.wrotePostion.get()); } } return this.getCommittedPosition(); } public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) { assert msg != null; assert cb != null; // file当前position int currentPos = this.wrotePostion.get(); if (currentPos < this.fileSize) { // 获取DirectByteBuffer的一个分片,重置了mark,position,limit是剩下的大小,和原来的buffer共享同一块内存 ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); this.wrotePostion.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: " + this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }
相关文章推荐
- RocketMQ源码 — 三、 Producer消息发送过程
- RocketMQ源码解析-Producer消息发送
- RocketMQ原理解析-producer 4.发送分布式事物消息
- RocketMQ原理解析-producer 4.发送分布式事物消息
- rocketmq-producer之发送事物消息
- OpenStack建立实例完整过程源码详细分析(12)----依据AMQP通信架构实现消息发送机制解析之一
- Kafka Producer 发送消息源码阅读
- OpenStack建立实例完整过程源码详细分析(13)----依据AMQP通信架构实现消息发送机制解析之二
- RocketMQ原理解析-producer 2.如何发送消息
- 结合源码说说使用Handler发送异步消息的实现过程
- RocketMQ原理解析-producer 2.如何发送消息
- RocketMQ源码 — 四、 Consumer 接收消息过程
- RocketMQ源码分析(二)Producer端发送数据
- RocketMQ原理解析-producer 3.如何发送顺序消息
- RocketMQ生产者消息发送源码解析总结
- 分布式消息队列 RocketMQ源码解析:事务消息
- Kafka、RabbitMQ、RocketMQ消息中间件的对比 —— 消息发送性能 (阿里中间件团队博客)
- 源码分析RocketMQ消息消费机制----消费者拉取消息机制
- android中handler消息发送机制源码剖析
- Kafka、RabbitMQ、RocketMQ 消息中间件的对比 | 消息发送性能篇