RocketMQ 源码分析 —— Message 存储
2018-03-27 15:45
776 查看
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-store/
文件命名方式:fileName
= fileName[n - 1] + mappedFileSize。在
BLANK :文件不足以存储消息时的空白占位。
说明 :存储消息,并返回存储结果。
第 2 行 :设置存储时间等。
第 16 至 36 行 :事务消息相关,暂未了解。
第 45 & 97 行 :获取锁与释放锁。
第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
第 55 至 62 行 :获取
第 65 行 :插入消息到
第 69 至 80 行 :
第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
第 143 至 173 行 :
1、概述
本文接《RocketMQ 源码分析 —— Message 发送与接收》。 主要解析CommitLog存储消息部分。
2、CommitLog 结构
CommitLog、
MappedFileQueue、
MappedFile的关系如下:
CommitLog:
MappedFileQueue:
MappedFile= 1 : 1 : N。反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlogYunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760-rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000-rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648-rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296 |
CommitLog、
MappedFileQueue、
MappedFile的定义如下:
MappedFile:00000000000000000000、00000000001073741824、00000000002147483648等文件。
MappedFileQueue:
MappedFile所在的文件夹,对
MappedFile进行封装成文件队列,对上层提供可无限使用的文件容量。每个
MappedFile统一文件大小。
文件命名方式:fileName
= fileName[n - 1] + mappedFileSize。在
CommitLog里默认为 1GB。
CommitLog:针对
MappedFileQueue的封装使用。
CommitLog目前存储在
MappedFile有两种内容类型:MESSAGE :消息。
BLANK :文件不足以存储消息时的空白占位。
CommitLog存储在
MappedFile的结构:
MESSAGE在
CommitLog存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | MsgLen | 消息总长度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 消息内容CRC | Int | 4 |
4 | QueueId | 消息队列编号 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 消息队列位置 | Long | 8 |
7 | PhysicalOffset | 物理位置。在 CommitLog的顺序存储位置。 | Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成消息时间戳 | Long | 8 |
10 | BornHost | 生效消息的地址+端口 | Long | 8 |
11 | StoreTimestamp | 存储消息时间戳 | Long | 8 |
12 | StoreHost | 存储消息的地址+端口 | Long | 8 |
13 | ReconsumeTimes | 重新消费消息次数 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 内容长度 + 内容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic长度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展字段长度 + 拓展字段 | Short + Bytes | 2 + PropertiesLength |
BLANK在
CommitLog存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | maxBlank | 空白长度 | Int | 4 |
2 | MagicCode | BLANK_MAGIC_CODE | Int | 4 |
3、CommitLog 存储消息
CommitLog#putMessage(...)
1: public PutMessageResult putMessage(final MessageExtBrokerInner msg) { 2: // Set the storage time 3: msg.setStoreTimestamp(System.currentTimeMillis()); 4: // Set the message body BODY CRC (consider the most appropriate setting 5: // on the client) 6: msg.setBodyCRC(UtilAll.crc32(msg.getBody())); 7: // Back to Results 8: AppendMessageResult result = null; 9: 10: StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); 11: 12: String topic = msg.getTopic(); 13: int queueId = msg.getQueueId(); 14: 15: // 事务相关 TODO 待读:事务相关 16: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 17: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 18: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 19: // Delay Delivery 20: if (msg.getDelayTimeLevel() > 0) { 21: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 22: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 23: } 24: 25: topic = ScheduleMessageService.SCHEDULE_TOPIC; 26: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); 27: 28: // Backup real topic, queueId 29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 30: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 31: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 32: 33: msg.setTopic(topic); 34: msg.setQueueId(queueId); 35: } 36: } 37: 38: long eclipseTimeInLock = 0; 39: 40: // 获取写入映射文件 41: MappedFile unlockMappedFile = null; 42: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); 43: 44: // 获取写入锁 45: lockForPutMessage(); //spin... 46: try { 47: long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); 48: this.beginTimeInLock = beginLockTimestamp; 49: 50: // Here settings are stored timestamp, in order to ensure an orderly 51: // global 52: msg.setStoreTimestamp(beginLockTimestamp); 53: 54: // 当不存在映射文件时,进行创建 55: if (null == mappedFile || mappedFile.isFull()) { 56: mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise 57: } 58: if (null == mappedFile) { 59: log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); 60: beginTimeInLock = 0; 61: return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); 62: } 63: 64: // 存储消息 65: result = mappedFile.appendMessage(msg, this.appendMessageCallback); 66: switch (result.getStatus()) { 67: case PUT_OK: 68: break; 69: case END_OF_FILE: // 当文件尾时,获取新的映射文件,并进行插入 70: unlockMappedFile = mappedFile; 71: // Create a new file, re-write the message 72: mappedFile = this.mappedFileQueue.getLastMappedFile(0); 73: if (null == mappedFile) { 74: // XXX: warn and notify me 75: log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); 76: beginTimeInLock = 0; 77: return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); 78: } 79: result = mappedFile.appendMessage(msg, this.appendMessageCallback); 80: break; 81: case MESSAGE_SIZE_EXCEEDED: 82: case PROPERTIES_SIZE_EXCEEDED: 83: beginTimeInLock = 0; 84: return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); 85: case UNKNOWN_ERROR: 86: beginTimeInLock = 0; 87: return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); 88: default: 89: beginTimeInLock = 0; 90: return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); 91: } 92: 93: eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; 94: beginTimeInLock = 0; 95: } finally { 96: // 释放写入锁 97: releasePutMessageLock(); 98: } 99: 100: if (eclipseTimeInLock > 500) {101: log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);102: }103: 104: // 105: if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {106: this.defaultMessageStore.unlockMappedFile(unlockMappedFile);107: }108: 109: PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);110: 111: // Statistics112: storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();113: storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());114: 115: // 进行同步||异步 flush||commit116: GroupCommitRequest request = null;117: // Synchronization flush118: if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {119: final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;120: if (msg.isWaitStoreMsgOK()) {121: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());122: service.putRequest(request);123: boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());124: if (!flushOK) {125: log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()126: + " client address: " + msg.getBornHostString());127: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);128: }129: } else {130: service.wakeup();131: }132: }133: // Asynchronous flush134: else {135: if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {136: flushCommitLogService.wakeup(); // important:唤醒commitLog线程,进行flush137: } else {138: commitLogService.wakeup();139: }140: }141: 142: // Synchronous write double 如果是同步Master,同步到从节点 // TODO 待读:数据同步143: if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {144: HAService service = this.defaultMessageStore.getHaService();145: if (msg.isWaitStoreMsgOK()) {146: // Determine whether to wait147: if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {148: if (null == request) {149: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());150: }151: service.putRequest(request);152: 153: service.getWaitNotifyObject().wakeupAll();154: 155: boolean flushOK =156: // TODO157: request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());158: if (!flushOK) {159: log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "160: + msg.getTags() + " client address: " + msg.getBornHostString());161: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);162: }163: }164: // Slave problem165: else {166: // Tell the producer, slave not available167: putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);168: }169: }170: }171: 172: return putMessageResult;173: } |
第 2 行 :设置存储时间等。
第 16 至 36 行 :事务消息相关,暂未了解。
第 45 & 97 行 :获取锁与释放锁。
第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
第 55 至 62 行 :获取
MappedFile,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)。
第 65 行 :插入消息到
MappedFile,解析解析见:MappedFile#appendMessage(...)。
第 69 至 80 行 :
MappedFile已满,创建新的,再次插入消息。
第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
第 143 至 173 行 :
Broker主从同步。后面的文章会详细解析
相关文章推荐
- RocketMQ 源码分析Message 存储
- RocketMQ 源码分析Message 拉取与消费(下)
- RocketMQ源码解析:Message存储
- RocketMQ源码分析----消息存储
- 阿里消息队列中间件 RocketMQ 源码分析 —— Message 拉取与消费(上)
- RocketMQ 源码分析Message 顺序发送与消费
- RocketMQ源码分析之Message消费与拉取(下Consume的拉取消费)
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- 源码分析RocketMQ消息消费机制----消费者拉取消息机制
- JDK源码分析——研究 Hash 存储机制
- (转)android的消息处理机制(图+源码分析)——Looper,Handler,Message
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- 【安卓笔记】Handler,Looper,MessageQueue,Message源码分析
- Handler的创建、Message的处理与Looper的作用——源码分析
- JDK源码学习之HashMap (一) : 底层存储结构分析
- PHP源码分析之变量的存储过程分解
- asp.net mvc源码分析-Controllerl篇 TempData数据存储
- Docker源码分析(十一):镜像存储
- android的消息处理机制(图+源码分析)——Looper,Handler,Message