RocketMQ源码解析-Broker的消息存储
2017-11-21 23:38
756 查看
Broker接收得到的来自provider的消息在sendMessageProcessor的sendMessage()方法当中处理。
在sendMessage()方法当中会直接将所接收得到的消息封装为MessageExtBrokerInner。
然后将封装得到的消息交由消息存储层,默认defaultMessageStore,调用putMessage()方法。
在DefaultMessageStrore里将会调用commitLog的putMessage()方法来存储消息。
在broker中,存储消息的文件在MapedFileQueue当中将不连续的物理文件作为连续的逻辑文件进行处理。也就是说,在broker中,由MapedFile来管理物理文件的映射。所有的消息队列共用这里的物理文件进行消息的物理存储,用过记录各个消息的偏移量offset来在这里的物理文件上得到所要取得到的消息具体数据。
作为逻辑上连续的文件队列,通过规定每个文件统一的大小可以通过offset精准的取得消息所在的物理文件上的位置。
可以通过下面这个方法,可以清楚地看到是如何在逻辑的连续文件队列中通过offset定位到文件的。
在这个方法可以清楚的看到恰恰是通过offset与规定的文件大小相除得到具体的文件在队列中的位置。
我们可以在DefaultMessageCallBack中看到,在MapedFileQueue当中会调用他的doAppend()回调方法来具体看到消息的存储过程。
在这个方法中,我们可以具体看到消息在broker当中的消息物理存储结构。在这里可以清楚的看到,消息将在这里按照存储规则写入byteBuffer。并通过byteBuffer的put()方法写入物理文件当中。
此时,消息的物理存储已经结束,但是,仍没有将消息的具体数据写入consumerQueue当中以便消费者消费。
在将消息存入物理文件之后,需要构造dispatchRequest,以便将消息的位置数据分发给相应的topic下面的ConsumerQueue以便消费者取得。
DefaultMessageStore将会将该条消息的位置信息放入DispatchMessageService的缓冲队列中,而DispatchMessageService将会周期性的从缓冲队列当中去封装有消息位置信息的dispatchRequest进行处理。
通过topic以及消息的queueId可以精确的得到消息所对应的ConsumQueue。
ConsumQueue的存储单元固定二十字节。
前八个字节存储消息在commitLog当中的偏移量offset位置。
中四个字节存储消息的大小。
后八个字节记录消息的tagsCode。
然后写在ConsumQueue自己的逻辑文件队列MapedFileQueue当中。
在生产者取消息的时候只需要根据offset取得consumQueue的相应存储单元,就可以在commitLog上定位得到所要取得的具体消息数据。
在消息的位置信息放入ConsumQueue后将会将消息加入indexService的阻塞队列,等待indexService定期构建索引。
在索引文件的key当中,通过 topic + “#” + key 进行构造key。 作为构造成员的key则是由topic和queueId组成。
而value则是在commitLog上的物理文件所在的位置,也是八字节来保存。
保存过程中,先通过keyHash与索引文件的slot数量取余,计算得到相对slot的存储位置。在用取余的结果根据文件头偏移量与每个slot的偏移量取得具体的的slot偏移量。
在计算得到slot的偏移量之后,根据索引文件的头大小以及所有slot的偏移量之和,加上之前所有的消息索引具体偏移量之和,得到消息索引的具体物理存放位置。
索引共占20字节,四字节keyHash,八字节commitLog的物理偏移量,四字节与索引文件建立时间的时间差,四字节的slot位置。
写入相应的索引文件,索引宣告建立完毕。
之后如果该broekr是同步master,将会在所有的broker将commitLog消息复制完毕到相应的offset之前,都会阻塞。
以上是Broker的消息存储。
在sendMessage()方法当中会直接将所接收得到的消息封装为MessageExtBrokerInner。
MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags())); msgInner.setQueueId(queueIdInt); msgInner.setSysFlag(sysFlag); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader .getReconsumeTimes());
然后将封装得到的消息交由消息存储层,默认defaultMessageStore,调用putMessage()方法。
在DefaultMessageStrore里将会调用commitLog的putMessage()方法来存储消息。
在broker中,存储消息的文件在MapedFileQueue当中将不连续的物理文件作为连续的逻辑文件进行处理。也就是说,在broker中,由MapedFile来管理物理文件的映射。所有的消息队列共用这里的物理文件进行消息的物理存储,用过记录各个消息的偏移量offset来在这里的物理文件上得到所要取得到的消息具体数据。
作为逻辑上连续的文件队列,通过规定每个文件统一的大小可以通过offset精准的取得消息所在的物理文件上的位置。
可以通过下面这个方法,可以清楚地看到是如何在逻辑的连续文件队列中通过offset定位到文件的。
public MapedFile findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { this.readWriteLock.readLock().lock(); MapedFile mapedFile = this.getFirstMapedFile(); if (mapedFile != null) { int index = (int) ((offset / this.mapedFileSize) - (mapedFile.getFileFromOffset() / this.mapedFileSize)); if (index < 0 || index >= this.mapedFiles.size()) { logError .warn( "findMapedFileByOffset offset not matched, request Offset: {}, index: {}, mapedFileSize: {}, mapedFiles count: {}, StackTrace: {}",// offset,// index,// this.mapedFileSize,// this.mapedFiles.size(),// UtilAll.currentStackTrace()); } try { return this.mapedFiles.get(index); } catch (Exception e) { if (returnFirstOnNotFound) { return mapedFile; } } } } catch (Exception e) { log.error("findMapedFileByOffset Exception", e); } finally { this.readWriteLock.readLock().unlock(); } return null; }
在这个方法可以清楚的看到恰恰是通过offset与规定的文件大小相除得到具体的文件在队列中的位置。
我们可以在DefaultMessageCallBack中看到,在MapedFileQueue当中会调用他的doAppend()回调方法来具体看到消息的存储过程。
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg; // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(), wroteOffset); // Record ConsumeQueue information String key = msgInner.getTopic() + "-" + msgInner.getQueueId(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queue case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType: queueOffset = 0L; break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: default: break; } /** * Serialize message */ final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; final byte[] topicData = msgInner.getTopic().getBytes(); final int topicLength = topicData == null ? 0 : topicData.length; final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; final int msgLen = 4 // 1 TOTALSIZE + 4 // 2 MAGICCODE + 4 // 3 BODYCRC + 4 // 4 QUEUEID + 4 // 5 FLAG + 8 // 6 QUEUEOFFSET + 8 // 7 PHYSICALOFFSET + 4 // 8 SYSFLAG + 8 // 9 BORNTIMESTAMP + 8 // 10 BORNHOST + 8 // 11 STORETIMESTAMP + 8 // 12 STOREHOSTADDRESS + 4 // 13 RECONSUMETIMES + 8 // 14 Prepared Transaction Offset + 4 + bodyLength // 14 BODY + 1 + topicLength // 15 TOPIC + 2 + propertiesLength // 16 propertiesLength + 0; // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, a9b2 msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetMsgStoreItemMemory(maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode); // 3 The remaining space may be any value // // Here the length of the specially set maxBlank byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset); } // Initialization of storage space this.resetMsgStoreItemMemory(msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MessageMagicCode); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.msgStoreItemMemory.put(msgInner.getBornHostBytes()); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.msgStoreItemMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset); switch (tranType) { case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType: break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; }
在这个方法中,我们可以具体看到消息在broker当中的消息物理存储结构。在这里可以清楚的看到,消息将在这里按照存储规则写入byteBuffer。并通过byteBuffer的put()方法写入物理文件当中。
此时,消息的物理存储已经结束,但是,仍没有将消息的具体数据写入consumerQueue当中以便消费者消费。
DispatchRequest dispatchRequest = new DispatchRequest(// topic,// 1 queueId,// 2 result.getWroteOffset(),// 3 result.getWroteBytes(),// 4 tagsCode,// 5 msg.getStoreTimestamp(),// 6 result.getLogicsOffset(),// 7 msg.getKeys(),// 8 /** * Transaction */ msg.getSysFlag(),// 9 msg.getPreparedTransactionOffset());// 10 this.defaultMessageStore.putDispatchRequest(dispatchRequest);
在将消息存入物理文件之后,需要构造dispatchRequest,以便将消息的位置数据分发给相应的topic下面的ConsumerQueue以便消费者取得。
DefaultMessageStore将会将该条消息的位置信息放入DispatchMessageService的缓冲队列中,而DispatchMessageService将会周期性的从缓冲队列当中去封装有消息位置信息的dispatchRequest进行处理。
通过topic以及消息的queueId可以精确的得到消息所对应的ConsumQueue。
ConsumQueue的存储单元固定二十字节。
前八个字节存储消息在commitLog当中的偏移量offset位置。
中四个字节存储消息的大小。
后八个字节记录消息的tagsCode。
然后写在ConsumQueue自己的逻辑文件队列MapedFileQueue当中。
在生产者取消息的时候只需要根据offset取得consumQueue的相应存储单元,就可以在commitLog上定位得到所要取得的具体消息数据。
在消息的位置信息放入ConsumQueue后将会将消息加入indexService的阻塞队列,等待indexService定期构建索引。
在索引文件的key当中,通过 topic + “#” + key 进行构造key。 作为构造成员的key则是由topic和queueId组成。
private String buildKey(final String topic, final String key) { return topic + "#" + key; }
而value则是在commitLog上的物理文件所在的位置,也是八字节来保存。
保存过程中,先通过keyHash与索引文件的slot数量取余,计算得到相对slot的存储位置。在用取余的结果根据文件头偏移量与每个slot的偏移量取得具体的的slot偏移量。
int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;
在计算得到slot的偏移量之后,根据索引文件的头大小以及所有slot的偏移量之和,加上之前所有的消息索引具体偏移量之和,得到消息索引的具体物理存放位置。
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE + this.indexHeader.getIndexCount() * INDEX_SIZE;
索引共占20字节,四字节keyHash,八字节commitLog的物理偏移量,四字节与索引文件建立时间的时间差,四字节的slot位置。
写入相应的索引文件,索引宣告建立完毕。
之后如果该broekr是同步master,将会在所有的broker将commitLog消息复制完毕到相应的offset之前,都会阻塞。
以上是Broker的消息存储。
相关文章推荐
- RocketMQ原理解析-broker 2.消息存储
- RocketMQ源码深度解析一之消息存储
- RocketMQ原理解析-broker 2.消息存储
- RocketMQ源码深度解析三之Broker篇
- RocketMQ原理解析-broker 2.消息存储
- RocketMQ源码解析-事务消息的二阶段提交
- RocketMQ原理解析-producer 5.消息在broker落地之普通消息
- 分布式消息队列 RocketMQ源码解析:事务消息
- RocketMQ事务消息源码解析及关键点总结(基于4.0.0+版本)
- 分布式消息队列 RocketMQ源码解析:事务消息
- 消息中间件 RocketMQ源码解析:Message存储
- 分布式消息队列 RocketMQ源码解析:Filtersrv
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:Filtersrv
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:Filtersrv
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:Filtersrv
- 分布式消息队列 RocketMQ源码解析:事务消息
- RocketMQ源码解析:事务消息