您的位置:首页 > 其它

RocketMQ源码分析之Message存储

2018-02-01 16:40 363 查看

小提示

后文中的putMessage并不是将完全将消息存储起来,这里是将消息存储在了Broker内存中,在后面还会有一个消息刷盘的机制,将消息存储到磁盘中,做持久化处理。

对象说明

我们存储的消息是以CommitLog对象放置,存在映射文件MappedFile中,过程中需要从MappedFileQueue中获得这个最终存储消息的文件。

一个消息存储对象的CommitLog只对应一个MappedFileQueue,对应多个MappedFile,就类似一种消息存在特定文件夹下面的不同文档中,一个文档满了就在新建个文档,默认大小1GB。文档命名方式:fileName
=fileName[n-1] + mappedFileSize.CommitLog消息对象有两种存储在MappedFile中,一种是MESSAGE、另一种就是BLANK(文建存储不够时产生的空白占位)。

message中的几个重要的CommitLog存储结构:msgLen、magicCode、BodyCRC(消息内容校验)、QueueId、QueueOffset、PhysicalOffset(在 CommitLog 的实际物理顺序存储位置)、BornTimeStamp、StoreTimeStamp、BodyLength + Body、TopicLength + Topic

CommitLog存储消息时序图



CommitLog#putMessage()解析:先是一些基本的格式内容Set,对于事务消息先会进行处理,然后就是获取映射文件【如果还不存在或者LastMappedFile满了,就新建一个文件】,然后获取将消息写入内存的写入锁【lockForPutMessage()】,然后执行try语句【这里面会再次执行获取mappedFile文件操作,因为并发的出现,可能出现文件又满了的情况需要重新新建一个文件;然后就是调用获取到的MappedFile#putMessage()方法进行消息存储,这里不包括消息刷盘存储如果存储成功就跳出swith语句进行后续操作,入股不成功相应处理然后返回PutMessageResult】,然后在finally里面释放写入锁releasePutMessageLock();如果消息存储成功会日志记录存储消息的时间等记录,然后就是对Topic进行统计,后面就是进行消息刷盘,有同步||异步,flush||commit方式,如果是异步 flush,里面会进行唤醒commitLog线程,进行flush(flushCommitLogService.wakeup()||commitLogService.wakeup()),如果BrokerRole是Master类型,那么还要做同步到从节点的操作,最后

返回putMessageResult

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
2:     // Set the storage time
3:     msg.setStoreTimestamp(System.currentTimeMillis());
4:     // Set the message body BODY CRC (consider the most appropriate setting
5:     // on the client)
6:     msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
7:     // Back to Results
8:     AppendMessageResult result = null;
9:
10:     StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
11:
12:     String topic = msg.getTopic();
13:     int queueId = msg.getQueueId();
14:
15:     // 事务相关 TODO 待读:事务相关
16:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
17:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
18:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
19:         // Delay Delivery
20:         if (msg.getDelayTimeLevel() > 0) {
21:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
22:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
23:             }
24:
25:             topic = ScheduleMessageService.SCHEDULE_TOPIC;
26:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
27:
28:             // Backup real topic, queueId
29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
30:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
31:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
32:
33:             msg.setTopic(topic);
34:             msg.setQueueId(queueId);
35:         }
36:     }
37:
38:     long eclipseTimeInLock = 0;
39:
40:     // 获取写入映射文件
41:     MappedFile unlockMappedFile = null;
42:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
43:
44:     // 获取写入锁
45:     lockForPutMessage(); //spin...
46:     try {
47:
4000
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
48:         this.beginTimeInLock = beginLockTimestamp;
49:
50:         // Here settings are stored timestamp, in order to ensure an orderly
51:         // global
52:         msg.setStoreTimestamp(beginLockTimestamp);
53:
54:         // 当不存在映射文件时,进行创建
55:         if (null == mappedFile || mappedFile.isFull()) {
56:             mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
57:         }
58:         if (null == mappedFile) {
59:             log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
60:             beginTimeInLock = 0;
61:             return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
62:         }
63:
64:         // 存储消息到mappedFile
65:         result = mappedFile.appendMessage(msg, this.appendMessageCallback);
66:         switch (result.getStatus()) {
67:             case PUT_OK:
68:                 break;
69:             case END_OF_FILE: // 当文件尾时,获取新的映射文件,并进行插入
70:                 unlockMappedFile = mappedFile;
71:                 // Create a new file, re-write the message
72:                 mappedFile = this.mappedFileQueue.getLastMappedFile(0);
73:                 if (null == mappedFile) {
74:                     // XXX: warn and notify me
75:                     log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
76:                     beginTimeInLock = 0;
77:                     return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
78:                 }
79:                 result = mappedFile.appendMessage(msg, this.appendMessageCallback);
80:                 break;
81:             case MESSAGE_SIZE_EXCEEDED:
82:             case PROPERTIES_SIZE_EXCEEDED:
83:                 beginTimeInLock = 0;
84:                 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
85:             case UNKNOWN_ERROR:
86:                 beginTimeInLock = 0;
87:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
88:             default:
89:                 beginTimeInLock = 0;
90:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
91:         }
92:
93:         eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
94:         beginTimeInLock = 0;
95:     } finally {
96:         // 释放写入锁
97:         releasePutMessageLock();
98:     }
99:
100:     if (eclipseTimeInLock > 500) {
101:         log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
102:     }
103:
104:     //
105:     if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
106:         this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
107:     }
108:
109:     PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
110:
111:     // Statistics
112:     storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
113:     storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
114:
115:     // 进行同步||异步 flush||commit
116:     GroupCommitRequest request = null;
117:     // Synchronization flush
118:     if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
119:         final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
120:         if (msg.isWaitStoreMsgOK()) {
121:             request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
122:             service.putRequest(request);
123:             boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
124:             if (!flushOK) {
125:                 log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
126:                     + " client address: " + msg.getBornHostString());
127:                 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
128:             }
129:         } else {
130:             service.wakeup();
131:         }
132:     }
133:     // Asynchronous flush
134:     else {
135:         if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
136:             flushCommitLogService.wakeup(); // important:唤醒commitLog线程,进行flush
137:         } else {
138:             commitLogService.wakeup();
139:         }
140:     }
141:
142:     // Synchronous write double 如果是同步Master,同步到从节点 // TODO 待读:数据同步
143:     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
144:         HAService service = this.defaultMessageStore.getHaService();
145:         if (msg.isWaitStoreMsgOK()) {
146:             // Determine whether to wait
147:             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
148:                 if (null == request) {
149:                     request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
150:                 }
151:                 service.putRequest(request);
152:
153:                 service.getWaitNotifyObject().wakeupAll();
154:
155:                 boolean flushOK =
156:                     // TODO
157:                     request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
158:                 if (!flushOK) {
159:                     log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
160:                         + msg.getTags() + " client address: " + msg.getBornHostString());
161:                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
162:                 }
163:             }
164:             // Slave problem
165:             else {
166:                 // Tell t
ddcd
he producer, slave not available
167:                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
168:             }
169:         }
170:     }
171:
172:     return putMessageResult;
173: }


MappedFileQueue#getLatMappedFile()

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
2:     long createOffset = -1; // 创建文件开始offset。-1时,不创建
3:     MappedFile mappedFileLast = getLastMappedFile();
4:
5:     if (mappedFileLast == null) { // 一个映射文件都不存在
6:         createOffset = startOffset - (startOffset % this.mappedFileSize);
7:     }
8:
9:     if (mappedFileLast != null && mappedFileLast.isFull()) { // 最后一个文件已满
10:         createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
11:     }
12:
13:     if (createOffset != -1 && needCreate) { // 创建文件
14:         String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
15:         String nextNextFilePath = this.storePath + File.separator
16:             + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
17:         MappedFile mappedFile = null;
18:
19:         if (this.allocateMappedFileService != null) {
20:             mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
21:                 nextNextFilePath, this.mappedFileSize);
22:         } else {
23:             try {
24:                 mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
25:             } catch (IOException e) {
26:                 log.error("create mappedFile exception", e);
27:             }
28:         }
29:
30:         if (mappedFile != null) {
31:             if (this.mappedFiles.isEmpty()) {
32:                 mappedFile.setFirstCreateInQueue(true);
33:             }
34:             this.mappedFiles.add(mappedFile);
35:         }
36:
37:         return mappedFile;
38:     }
39:
40:     return mappedFileLast;
41: }


FlushCommitLogService



在fushRealTimeService,采用这种方式进行刷盘,如果过程中出现了Broker出故障宕机情况,那么会强制flush保证数据的完整性与正确性,避免有未刷盘的数据。

性能对比:



MappedFile#落盘(真正写如到磁盘的方式)





为了提高flush性能,利用private boolean isAbleToFlush(final int flushLeastPages)判断是否执行flush操作:是否能够flush。满足如下条件任意条件:

1. 映射文件已经写满

2. flushLeastPages>0&&未flush部分超过flushLeastPages

3. flushLeastPages=0&&有新写入部分

1: /**
2:  * flush
3:  *
4:  * @param flushLeastPages flush最小页数
5:  * @return The current flushed position
6:  */
7: public int flush(final int flushLeastPages) {
8:     if (this.isAbleToFlush(flushLeastPages)) {
9:         if (this.hold()) {
10:             int value = getReadPosition();
11:
12:             try {
13:                 //We only append data to fileChannel or mappedByteBuffer, never both.
14:                 if (writeBuffer != null || this.fileChannel.position() != 0) {
15:                     this.fileChannel.force(false);
16:                 } else {
17:                     this.mappedByteBuffer.force();
18:                 }
19:             } catch (Throwable e) {
20:                 log.error("Error occurred when force data to disk.", e);
21:             }
22:
23:             this.flushedPosition.set(value);
24:             this.release();
25:         } else {
26:             log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
27:             this.flushedPosition.set(getReadPosition());
28:         }
29:     }
30:     return this.getFlushedPosition();
31: }
32:
33: /**
34:  * 是否能够flush。满足如下条件任意条件:
35:  * 1. 映射文件已经写满
36:  * 2. flushLeastPages > 0 && 未flush部分超过flushLeastPages
37:  * 3. flushLeastPages = 0 && 有新写入部分
38:  *
39:  * @param flushLeastPages flush最小分页
40:  * @return 是否能够写入
41:  */
42: private boolean isAbleToFlush(final int flushLeastPages) {
43:     int flush = this.flushedPosition.get();
44:     int write = getReadPosition();
45:
46:     if (this.isFull()) {
47:         return true;
48:     }
49:
50:     if (flushLeastPages > 0) {
51:         return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
52:     }
53:
54:     return write > flush;
55: }


commit方式为了提高性能所做的判断与flush判断方式几乎一样,一个是flushLeastPages,一个是commitLeastPages
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: