您的位置:首页 > 编程语言 > Java开发

RocketMQ 源码分析 —— Message 存储

2018-03-27 15:45 776 查看
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-store/

1、概述

本文接《RocketMQ 源码分析 —— Message 发送与接收》。 主要解析 
CommitLog
 存储消息部分。

2、CommitLog 结构

CommitLog
MappedFileQueue
MappedFile
 的关系如下:

CommitLog
 : 
MappedFileQueue
 : 
MappedFile
 = 1 : 1 : N。反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlogYunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760-rw-r--r--  1 yunai  staff  1073741824  4 21 16:27 00000000000000000000-rw-r--r--  1 yunai  staff  1073741824  4 21 16:29 00000000001073741824-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000002147483648-rw-r--r--  1 yunai  staff  1073741824  4 21 16:33 00000000003221225472-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000004294967296
CommitLog
MappedFileQueue
MappedFile
 的定义如下:
MappedFile
 :00000000000000000000、00000000001073741824、00000000002147483648等文件。
MappedFileQueue
 :
MappedFile
 所在的文件夹,对 
MappedFile
 进行封装成文件队列,对上层提供可无限使用的文件容量。每个 
MappedFile
 统一文件大小。
文件命名方式:fileName
= fileName[n - 1] + mappedFileSize。在 
CommitLog
 里默认为 1GB。

CommitLog
 :针对 
MappedFileQueue
 的封装使用。
CommitLog
 目前存储在 
MappedFile
 有两种内容类型:MESSAGE :消息。
BLANK :文件不足以存储消息时的空白占位。
CommitLog
 存储在 
MappedFile
的结构:
MESSAGE
 在 
CommitLog
 存储结构:
第几位字段说明数据类型字节数
1MsgLen消息总长度Int4
2MagicCodeMESSAGE_MAGIC_CODEInt4
3BodyCRC消息内容CRCInt4
4QueueId消息队列编号Int4
5FlagflagInt4
6QueueOffset消息队列位置Long8
7PhysicalOffset物理位置。在 
CommitLog
 的顺序存储位置。
Long8
8SysFlagMessageSysFlagInt4
9BornTimestamp生成消息时间戳Long8
10BornHost生效消息的地址+端口Long8
11StoreTimestamp存储消息时间戳Long8
12StoreHost存储消息的地址+端口Long8
13ReconsumeTimes重新消费消息次数Int4
14PreparedTransationOffset Long8
15BodyLength + Body内容长度 + 内容Int + Bytes4 + bodyLength
16TopicLength + TopicTopic长度 + TopicByte + Bytes1 + topicLength
17PropertiesLength + Properties拓展字段长度 + 拓展字段Short + Bytes2 + PropertiesLength
BLANK
 在 
CommitLog
 存储结构:
第几位字段说明数据类型字节数
1maxBlank空白长度Int4
2MagicCodeBLANK_MAGIC_CODEInt4

3、CommitLog 存储消息


CommitLog#putMessage(...)

  1: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {  2:     // Set the storage time  3:     msg.setStoreTimestamp(System.currentTimeMillis());  4:     // Set the message body BODY CRC (consider the most appropriate setting  5:     // on the client)  6:     msg.setBodyCRC(UtilAll.crc32(msg.getBody()));  7:     // Back to Results  8:     AppendMessageResult result = null;  9:  10:     StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); 11:  12:     String topic = msg.getTopic(); 13:     int queueId = msg.getQueueId(); 14:  15:     // 事务相关 TODO 待读:事务相关 16:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 17:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 18:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 19:         // Delay Delivery 20:         if (msg.getDelayTimeLevel() > 0) { 21:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 22:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 23:             } 24:  25:             topic = ScheduleMessageService.SCHEDULE_TOPIC; 26:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); 27:  28:             // Backup real topic, queueId 29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 30:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 31:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 32:  33:             msg.setTopic(topic); 34:             msg.setQueueId(queueId); 35:         } 36:     } 37:  38:     long eclipseTimeInLock = 0; 39:  40:     // 获取写入映射文件 41:     MappedFile unlockMappedFile = null; 42:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); 43:  44:     // 获取写入锁 45:     lockForPutMessage(); //spin... 46:     try { 47:         long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); 48:         this.beginTimeInLock = beginLockTimestamp; 49:  50:         // Here settings are stored timestamp, in order to ensure an orderly 51:         // global 52:         msg.setStoreTimestamp(beginLockTimestamp); 53:  54:         // 当不存在映射文件时,进行创建 55:         if (null == mappedFile || mappedFile.isFull()) { 56:             mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise 57:         } 58:         if (null == mappedFile) { 59:             log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); 60:             beginTimeInLock = 0; 61:             return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); 62:         } 63:  64:         // 存储消息 65:         result = mappedFile.appendMessage(msg, this.appendMessageCallback); 66:         switch (result.getStatus()) { 67:             case PUT_OK: 68:                 break; 69:             case END_OF_FILE: // 当文件尾时,获取新的映射文件,并进行插入 70:                 unlockMappedFile = mappedFile; 71:                 // Create a new file, re-write the message 72:                 mappedFile = this.mappedFileQueue.getLastMappedFile(0); 73:                 if (null == mappedFile) { 74:                     // XXX: warn and notify me 75:                     log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); 76:                     beginTimeInLock = 0; 77:                     return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); 78:                 } 79:                 result = mappedFile.appendMessage(msg, this.appendMessageCallback); 80:                 break; 81:             case MESSAGE_SIZE_EXCEEDED: 82:             case PROPERTIES_SIZE_EXCEEDED: 83:                 beginTimeInLock = 0; 84:                 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); 85:             case UNKNOWN_ERROR: 86:                 beginTimeInLock = 0; 87:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); 88:             default: 89:                 beginTimeInLock = 0; 90:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); 91:         } 92:  93:         eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; 94:         beginTimeInLock = 0; 95:     } finally { 96:         // 释放写入锁 97:         releasePutMessageLock(); 98:     } 99: 100:     if (eclipseTimeInLock > 500) {101:         log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);102:     }103: 104:     // 105:     if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {106:         this.defaultMessageStore.unlockMappedFile(unlockMappedFile);107:     }108: 109:     PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);110: 111:     // Statistics112:     storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();113:     storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());114: 115:     // 进行同步||异步 flush||commit116:     GroupCommitRequest request = null;117:     // Synchronization flush118:     if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {119:         final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;120:         if (msg.isWaitStoreMsgOK()) {121:             request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());122:             service.putRequest(request);123:             boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());124:             if (!flushOK) {125:                 log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()126:                     + " client address: " + msg.getBornHostString());127:                 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);128:             }129:         } else {130:             service.wakeup();131:         }132:     }133:     // Asynchronous flush134:     else {135:         if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {136:             flushCommitLogService.wakeup(); // important:唤醒commitLog线程,进行flush137:         } else {138:             commitLogService.wakeup();139:         }140:     }141: 142:     // Synchronous write double 如果是同步Master,同步到从节点 // TODO 待读:数据同步143:     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {144:         HAService service = this.defaultMessageStore.getHaService();145:         if (msg.isWaitStoreMsgOK()) {146:             // Determine whether to wait147:             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {148:                 if (null == request) {149:                     request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());150:                 }151:                 service.putRequest(request);152: 153:                 service.getWaitNotifyObject().wakeupAll();154: 155:                 boolean flushOK =156:                     // TODO157:                     request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());158:                 if (!flushOK) {159:                     log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "160:                         + msg.getTags() + " client address: " + msg.getBornHostString());161:                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);162:                 }163:             }164:             // Slave problem165:             else {166:                 // Tell the producer, slave not available167:                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);168:             }169:         }170:     }171: 172:     return putMessageResult;173: }
说明 :存储消息,并返回存储结果。
第 2 行 :设置存储时间等。
第 16 至 36 行 :事务消息相关,暂未了解。
第 45 & 97 行 :获取锁与释放锁。
第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
第 55 至 62 行 :获取 
MappedFile
,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)
第 65 行 :插入消息到 
MappedFile
,解析解析见:MappedFile#appendMessage(...)
第 69 至 80 行 :
MappedFile
 已满,创建新的,再次插入消息。
第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService
第 143 至 173 行 :
Broker
 主从同步。后面的文章会详细解析
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  MQ java RocketMQ