RocketMQ源码分析之Message存储
2018-02-01 16:40
363 查看
小提示
后文中的putMessage并不是将完全将消息存储起来,这里是将消息存储在了Broker内存中,在后面还会有一个消息刷盘的机制,将消息存储到磁盘中,做持久化处理。对象说明
我们存储的消息是以CommitLog对象放置,存在映射文件MappedFile中,过程中需要从MappedFileQueue中获得这个最终存储消息的文件。一个消息存储对象的CommitLog只对应一个MappedFileQueue,对应多个MappedFile,就类似一种消息存在特定文件夹下面的不同文档中,一个文档满了就在新建个文档,默认大小1GB。文档命名方式:fileName
=fileName[n-1] + mappedFileSize.CommitLog消息对象有两种存储在MappedFile中,一种是MESSAGE、另一种就是BLANK(文建存储不够时产生的空白占位)。
message中的几个重要的CommitLog存储结构:msgLen、magicCode、BodyCRC(消息内容校验)、QueueId、QueueOffset、PhysicalOffset(在 CommitLog 的实际物理顺序存储位置)、BornTimeStamp、StoreTimeStamp、BodyLength + Body、TopicLength + Topic
CommitLog存储消息时序图
CommitLog#putMessage()解析:先是一些基本的格式内容Set,对于事务消息先会进行处理,然后就是获取映射文件【如果还不存在或者LastMappedFile满了,就新建一个文件】,然后获取将消息写入内存的写入锁【lockForPutMessage()】,然后执行try语句【这里面会再次执行获取mappedFile文件操作,因为并发的出现,可能出现文件又满了的情况需要重新新建一个文件;然后就是调用获取到的MappedFile#putMessage()方法进行消息存储,这里不包括消息刷盘存储如果存储成功就跳出swith语句进行后续操作,入股不成功相应处理然后返回PutMessageResult】,然后在finally里面释放写入锁releasePutMessageLock();如果消息存储成功会日志记录存储消息的时间等记录,然后就是对Topic进行统计,后面就是进行消息刷盘,有同步||异步,flush||commit方式,如果是异步 flush,里面会进行唤醒commitLog线程,进行flush(flushCommitLogService.wakeup()||commitLogService.wakeup()),如果BrokerRole是Master类型,那么还要做同步到从节点的操作,最后
返回putMessageResult
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { 2: // Set the storage time 3: msg.setStoreTimestamp(System.currentTimeMillis()); 4: // Set the message body BODY CRC (consider the most appropriate setting 5: // on the client) 6: msg.setBodyCRC(UtilAll.crc32(msg.getBody())); 7: // Back to Results 8: AppendMessageResult result = null; 9: 10: StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); 11: 12: String topic = msg.getTopic(); 13: int queueId = msg.getQueueId(); 14: 15: // 事务相关 TODO 待读:事务相关 16: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 17: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 18: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 19: // Delay Delivery 20: if (msg.getDelayTimeLevel() > 0) { 21: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 22: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 23: } 24: 25: topic = ScheduleMessageService.SCHEDULE_TOPIC; 26: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); 27: 28: // Backup real topic, queueId 29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 30: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 31: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 32: 33: msg.setTopic(topic); 34: msg.setQueueId(queueId); 35: } 36: } 37: 38: long eclipseTimeInLock = 0; 39: 40: // 获取写入映射文件 41: MappedFile unlockMappedFile = null; 42: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); 43: 44: // 获取写入锁 45: lockForPutMessage(); //spin... 46: try { 47: 4000 long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); 48: this.beginTimeInLock = beginLockTimestamp; 49: 50: // Here settings are stored timestamp, in order to ensure an orderly 51: // global 52: msg.setStoreTimestamp(beginLockTimestamp); 53: 54: // 当不存在映射文件时,进行创建 55: if (null == mappedFile || mappedFile.isFull()) { 56: mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise 57: } 58: if (null == mappedFile) { 59: log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); 60: beginTimeInLock = 0; 61: return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); 62: } 63: 64: // 存储消息到mappedFile 65: result = mappedFile.appendMessage(msg, this.appendMessageCallback); 66: switch (result.getStatus()) { 67: case PUT_OK: 68: break; 69: case END_OF_FILE: // 当文件尾时,获取新的映射文件,并进行插入 70: unlockMappedFile = mappedFile; 71: // Create a new file, re-write the message 72: mappedFile = this.mappedFileQueue.getLastMappedFile(0); 73: if (null == mappedFile) { 74: // XXX: warn and notify me 75: log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); 76: beginTimeInLock = 0; 77: return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); 78: } 79: result = mappedFile.appendMessage(msg, this.appendMessageCallback); 80: break; 81: case MESSAGE_SIZE_EXCEEDED: 82: case PROPERTIES_SIZE_EXCEEDED: 83: beginTimeInLock = 0; 84: return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); 85: case UNKNOWN_ERROR: 86: beginTimeInLock = 0; 87: return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); 88: default: 89: beginTimeInLock = 0; 90: return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); 91: } 92: 93: eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; 94: beginTimeInLock = 0; 95: } finally { 96: // 释放写入锁 97: releasePutMessageLock(); 98: } 99: 100: if (eclipseTimeInLock > 500) { 101: log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result); 102: } 103: 104: // 105: if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { 106: this.defaultMessageStore.unlockMappedFile(unlockMappedFile); 107: } 108: 109: PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); 110: 111: // Statistics 112: storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); 113: storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); 114: 115: // 进行同步||异步 flush||commit 116: GroupCommitRequest request = null; 117: // Synchronization flush 118: if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { 119: final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; 120: if (msg.isWaitStoreMsgOK()) { 121: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 122: service.putRequest(request); 123: boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); 124: if (!flushOK) { 125: log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() 126: + " client address: " + msg.getBornHostString()); 127: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); 128: } 129: } else { 130: service.wakeup(); 131: } 132: } 133: // Asynchronous flush 134: else { 135: if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { 136: flushCommitLogService.wakeup(); // important:唤醒commitLog线程,进行flush 137: } else { 138: commitLogService.wakeup(); 139: } 140: } 141: 142: // Synchronous write double 如果是同步Master,同步到从节点 // TODO 待读:数据同步 143: if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { 144: HAService service = this.defaultMessageStore.getHaService(); 145: if (msg.isWaitStoreMsgOK()) { 146: // Determine whether to wait 147: if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { 148: if (null == request) { 149: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 150: } 151: service.putRequest(request); 152: 153: service.getWaitNotifyObject().wakeupAll(); 154: 155: boolean flushOK = 156: // TODO 157: request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); 158: if (!flushOK) { 159: log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: " 160: + msg.getTags() + " client address: " + msg.getBornHostString()); 161: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); 162: } 163: } 164: // Slave problem 165: else { 166: // Tell t ddcd he producer, slave not available 167: putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); 168: } 169: } 170: } 171: 172: return putMessageResult; 173: }
MappedFileQueue#getLatMappedFile()
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) { 2: long createOffset = -1; // 创建文件开始offset。-1时,不创建 3: MappedFile mappedFileLast = getLastMappedFile(); 4: 5: if (mappedFileLast == null) { // 一个映射文件都不存在 6: createOffset = startOffset - (startOffset % this.mappedFileSize); 7: } 8: 9: if (mappedFileLast != null && mappedFileLast.isFull()) { // 最后一个文件已满 10: createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize; 11: } 12: 13: if (createOffset != -1 && needCreate) { // 创建文件 14: String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); 15: String nextNextFilePath = this.storePath + File.separator 16: + UtilAll.offset2FileName(createOffset + this.mappedFileSize); 17: MappedFile mappedFile = null; 18: 19: if (this.allocateMappedFileService != null) { 20: mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, 21: nextNextFilePath, this.mappedFileSize); 22: } else { 23: try { 24: mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); 25: } catch (IOException e) { 26: log.error("create mappedFile exception", e); 27: } 28: } 29: 30: if (mappedFile != null) { 31: if (this.mappedFiles.isEmpty()) { 32: mappedFile.setFirstCreateInQueue(true); 33: } 34: this.mappedFiles.add(mappedFile); 35: } 36: 37: return mappedFile; 38: } 39: 40: return mappedFileLast; 41: }
FlushCommitLogService
在fushRealTimeService,采用这种方式进行刷盘,如果过程中出现了Broker出故障宕机情况,那么会强制flush保证数据的完整性与正确性,避免有未刷盘的数据。
性能对比:
MappedFile#落盘(真正写如到磁盘的方式)
为了提高flush性能,利用private boolean isAbleToFlush(final int flushLeastPages)判断是否执行flush操作:是否能够flush。满足如下条件任意条件:
1. 映射文件已经写满
2. flushLeastPages>0&&未flush部分超过flushLeastPages
3. flushLeastPages=0&&有新写入部分
1: /** 2: * flush 3: * 4: * @param flushLeastPages flush最小页数 5: * @return The current flushed position 6: */ 7: public int flush(final int flushLeastPages) { 8: if (this.isAbleToFlush(flushLeastPages)) { 9: if (this.hold()) { 10: int value = getReadPosition(); 11: 12: try { 13: //We only append data to fileChannel or mappedByteBuffer, never both. 14: if (writeBuffer != null || this.fileChannel.position() != 0) { 15: this.fileChannel.force(false); 16: } else { 17: this.mappedByteBuffer.force(); 18: } 19: } catch (Throwable e) { 20: log.error("Error occurred when force data to disk.", e); 21: } 22: 23: this.flushedPosition.set(value); 24: this.release(); 25: } else { 26: log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); 27: this.flushedPosition.set(getReadPosition()); 28: } 29: } 30: return this.getFlushedPosition(); 31: } 32: 33: /** 34: * 是否能够flush。满足如下条件任意条件: 35: * 1. 映射文件已经写满 36: * 2. flushLeastPages > 0 && 未flush部分超过flushLeastPages 37: * 3. flushLeastPages = 0 && 有新写入部分 38: * 39: * @param flushLeastPages flush最小分页 40: * @return 是否能够写入 41: */ 42: private boolean isAbleToFlush(final int flushLeastPages) { 43: int flush = this.flushedPosition.get(); 44: int write = getReadPosition(); 45: 46: if (this.isFull()) { 47: return true; 48: } 49: 50: if (flushLeastPages > 0) { 51: return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; 52: } 53: 54: return write > flush; 55: }
commit方式为了提高性能所做的判断与flush判断方式几乎一样,一个是flushLeastPages,一个是commitLeastPages
相关文章推荐
- 源码分析RocketMQ消息消费机制----消费者拉取消息机制
- RocketMQ 源码分析 高可用
- RocketMQ 源码分析(一)
- RocketMQ源码分析----消息存储
- RocketMQ 源码分析 —— Filtersrv
- 源码分析RocketMQ消息过滤机制上篇-----消息消费服务端过滤与TAG模式过滤实现
- RocketMQ 源码分析 事务消息
- RocketMQ源码:有序消息分析
- RocketMQ client客户端模块源码分析一(生产者)
- 源码分析RocketMQ消息消费机制----消费端消息负载均衡机制与重新分布
- rocketmq-4.0.0 和4.1.0 源码比对分析
- RocketMQ 源码分析 定时消息与消息重试
- rocketmq-remoting源码分析NettyRemotingClient
- Rocketmq之namesrv启动流程源码详解分析
- RocketMQ 源码分析Message 拉取与消费(下)
- RocketMQ 源码分析 —— Message 存储
- RocketMQ 源码分析 —— 高可用
- RocketMQ源码分析之Message消费与拉取(下Consume的拉取消费)
- RocketMQ源码分析之顺序消费
- rocketmq-remoting 源码分析NettyRemotingServer