RocketMQ的消息存储
2017-12-15 08:31
344 查看
1.客户端发送消息给任一broker服务端,服务端用SendMessageProcessor接收private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
final RemotingCommand request, //
final SendMessageContext sendMessageContext, //
final SendMessageRequestHeader requestHeader) throws RemotingCommandException2.经过一系列校验,PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);进入DefaultMessageStore的PutMessageResult result = this.commitLog.putMessage(msg);到达进行消息存储的commitLog类进行实际的存储操作。
3.这里会加对象锁,保证顺序性,要存储消息,必须要获取具体的文件操作类,这里会进行寻找目前最新的一个文件类,因为是按顺序写,一个文件写满之后才会创建新的文件,并且每个文件大小默认为1G,文件名字就是对应的偏移量,MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();然后进行操作文件并且返回操作结果AppendMessageResult result = mapedFile.appendMessage(msg, this.appendMessageCallback);
4.MapedFile 类记录了内存的写偏移量wrotePostion,真实文件的偏移量committedPosition,以及文件类file,文件的最初总偏移量fileFromOffset也就是文件名fileName,操作和映射这个文件的虚拟内存mappedByteBuffer,截取mappedByteBuffer获取到他的代表,然后把位移量设置到已经写到内存的偏移量,然后继续在后面追加内容即可,这样就像是直接在文件里面按顺序写入消息一样 public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
int currentPos = this.wrotePostion.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result =
cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
this.wrotePostion.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
}5.开始往内存里面写入消息数据,总共有20段数据要存,最后把这些数据放入mappedByteBuffer中,组装追加消息结果返回。追加消息之前计算需要写入的消息长度,再加上MAGICCODE魔数4个字节和文件剩余的最大空间这4个字段,后面这8个字节主要是防止文件没有空间时来标识文件已经结束。每一类文件基本都会有这个标志,用它来标识到底这个文件是txt还是jpg,然后再判断文件还有没有那么多空间,如果没有的话,就会写入那八个字节的数据,剩余的空间可以不用写,返回即可。消息文件默认是// CommitLog file size,default is 1G
private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;一条消息默认是 // The maximum size of a single log file,default is 4M
private int maxMessageSize = 1024 * 1024 * 4;如果速度足够快,消息足够大, 选择异步刷盘,byteBuffer.position(currentPos);这一块就很有可能会出现异常,当然这个情况基本不会存在,这是臆想而已。。。
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetMsgStoreItemMemory(maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode);
// 3 The remaining space may be any value
//
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}当消息写完之后,会给topicQueueTable里面对应的偏移量加一。key是topic和queueId的组成6.判断结果状态,如果是已经到文件尾部,就会重新创建新的文件并返回重新来一次操作 switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMapedFile = mapedFile;
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
if (null == mapedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;具体操作在mapedFileQueue.getLastMapedFile里面,查找文件列表的最后一个文件是否写满
服务启动时AllocateMapedFileService这个线程会一直从这个队列里取出请求while (!this.isStoped() && this.mmapOperation())
req = this.requestQueue.take();创建文件 MapedFile mapedFile = new MapedFile(req.getFilePath(), req.getFileSize());创建成果之后会提醒之前阻塞的请求req.getCountDownLatch().countDown();
8.判断刷盘,同步还是异步
同步时,直接往GroupCommitService.putRequest(request);设置超时时间等待处理 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());放入请求this.requestsWrite.add(request);,这个线程会一直交换读写列表, this.waitForRunning(0);
this.doCommit();然后发现有请求时就会提交偏移量
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
if (null == request) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
// TODO
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}服务启动时,HAService会启动对应的服务端连接监听,从服务HAClient连接主服务后保存连接,从监听读请求,主监听读写请求,这里直接使用nio代码编写,
final RemotingCommand request, //
final SendMessageContext sendMessageContext, //
final SendMessageRequestHeader requestHeader) throws RemotingCommandException2.经过一系列校验,PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);进入DefaultMessageStore的PutMessageResult result = this.commitLog.putMessage(msg);到达进行消息存储的commitLog类进行实际的存储操作。
3.这里会加对象锁,保证顺序性,要存储消息,必须要获取具体的文件操作类,这里会进行寻找目前最新的一个文件类,因为是按顺序写,一个文件写满之后才会创建新的文件,并且每个文件大小默认为1G,文件名字就是对应的偏移量,MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();然后进行操作文件并且返回操作结果AppendMessageResult result = mapedFile.appendMessage(msg, this.appendMessageCallback);
4.MapedFile 类记录了内存的写偏移量wrotePostion,真实文件的偏移量committedPosition,以及文件类file,文件的最初总偏移量fileFromOffset也就是文件名fileName,操作和映射这个文件的虚拟内存mappedByteBuffer,截取mappedByteBuffer获取到他的代表,然后把位移量设置到已经写到内存的偏移量,然后继续在后面追加内容即可,这样就像是直接在文件里面按顺序写入消息一样 public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
int currentPos = this.wrotePostion.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result =
cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
this.wrotePostion.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
}5.开始往内存里面写入消息数据,总共有20段数据要存,最后把这些数据放入mappedByteBuffer中,组装追加消息结果返回。追加消息之前计算需要写入的消息长度,再加上MAGICCODE魔数4个字节和文件剩余的最大空间这4个字段,后面这8个字节主要是防止文件没有空间时来标识文件已经结束。每一类文件基本都会有这个标志,用它来标识到底这个文件是txt还是jpg,然后再判断文件还有没有那么多空间,如果没有的话,就会写入那八个字节的数据,剩余的空间可以不用写,返回即可。消息文件默认是// CommitLog file size,default is 1G
private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;一条消息默认是 // The maximum size of a single log file,default is 4M
private int maxMessageSize = 1024 * 1024 * 4;如果速度足够快,消息足够大, 选择异步刷盘,byteBuffer.position(currentPos);这一块就很有可能会出现异常,当然这个情况基本不会存在,这是臆想而已。。。
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetMsgStoreItemMemory(maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode);
// 3 The remaining space may be any value
//
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}当消息写完之后,会给topicQueueTable里面对应的偏移量加一。key是topic和queueId的组成6.判断结果状态,如果是已经到文件尾部,就会重新创建新的文件并返回重新来一次操作 switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMapedFile = mapedFile;
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
if (null == mapedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;具体操作在mapedFileQueue.getLastMapedFile里面,查找文件列表的最后一个文件是否写满
if (mapedFileLast != null && mapedFileLast.isFull()) { createOffset = mapedFileLast.getFileFromOffset() + this.mapedFileSize; }
public boolean isFull() { return this.fileSize == this.wrotePostion.get(); }7.然后就是开始创建文件,一次会创建两个文件
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + this.mapedFileSize);
mapedFile = this.allocateMapedFileService.putRequestAndReturnMapedFile(nextFilePath, nextNextFilePath, this.mapedFileSize);把创建文件的需求放入PriorityBlockingQueue优先级的阻塞队列里, booean offerOK = this.requestQueue.offer(nextReq); 然后就会进行超时等待boolean waitOK = result.getCountDownLatch().await(WaitTimeOut, TimeUnit.MILLISECONDS);
服务启动时AllocateMapedFileService这个线程会一直从这个队列里取出请求while (!this.isStoped() && this.mmapOperation())
req = this.requestQueue.take();创建文件 MapedFile mapedFile = new MapedFile(req.getFilePath(), req.getFileSize());创建成果之后会提醒之前阻塞的请求req.getCountDownLatch().countDown();
8.判断刷盘,同步还是异步
同步时,直接往GroupCommitService.putRequest(request);设置超时时间等待处理 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());放入请求this.requestsWrite.add(request);,这个线程会一直交换读写列表, this.waitForRunning(0);
this.doCommit();然后发现有请求时就会提交偏移量
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
CommitLog.this.mapedFileQueue.commit(0);这个0代表分页,每页OS_PAGE_SIZE = 1024 * 4;找到总的偏移量committedWhere找到需要提交的文件 MapedFile mapedFile = this.findMapedFileByOffset(this.committedWhere, true);然后开始提交,int offset = mapedFile.commit(flushLeastPages);把数据真正写入文件里,释放资源,设置提交偏移量
public int commit(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = this.wrotePostion.get(); //数据刷盘,内存到磁盘 this.mappedByteBuffer.force(); this.committedPosition.set(value); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); this.committedPosition.set(this.wrotePostion.get()); } } return this.getCommittedPosition(); }之后释放请求锁 req.wakeupCustomer(flushOK);异步的话通知FlushRealTimeService线程自己处理,过程和同步类似9.数据复制到从broker有同步和异步,同步备份 if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()),主broker默认打开10912短偶和从进行消息同步HAService service = this.defaultMessageStore.getHaService();提交同步请求,触发线程requestsWrite和requestsRead数据交换,然后超时等待if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
// TODO
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}服务启动时,HAService会启动对应的服务端连接监听,从服务HAClient连接主服务后保存连接,从监听读请求,主监听读写请求,这里直接使用nio代码编写,
public void start() { this.acceptSocketService.beginAccept(); this.acceptSocketService.start(); this.groupTransferService.start(); this.haClient.start(); }两端发送心跳的时间都为5s,从HAClient可以发送自己目前的偏移量
private boolean reportSlaveMaxOffset(final long maxOffset) { this.reportOffset.position(0); this.reportOffset.limit(8); this.reportOffset.putLong(maxOffset); this.reportOffset.position(0); this.reportOffset.limit(8); for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) { try { this.socketChannel.write(this.reportOffset);主ReadSocketService监听到之后会更新slaveRequestOffset,slaveAckOffset这两个变量
int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); if ((this.byteBufferRead.position() - this.processPostion) >= 8) { int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); long readOffset = this.byteBufferRead.getLong(pos - 8); this.processPostion = pos; HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset < 0) { HAConnection.this.slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); } HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); }然后触发 this.groupTransferService.notifyTransferSome();但是这时这个条件还不满足,必须要等到主把新的的偏移量以及消息数据写入通道,
SelectMapedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); if (selectResult != null) { b81e int size = selectResult.getSize(); if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); } long thisOffset = this.nextTransferFromWhere; this.nextTransferFromWhere += size; selectResult.getByteBuffer().limit(size); this.selectMapedBufferResult = selectResult; // Build Header this.byteBufferHeader.position(0); this.byteBufferHeader.limit(HEADER_SIZE); this.byteBufferHeader.putLong(thisOffset); this.byteBufferHeader.putInt(size); this.byteBufferHeader.flip(); this.lastWriteOver = this.transferData(); }组装数据头,以及body,最后清掉body占据的内存,因为他不是固定的某个值,并且占用内存比较大,所以需要每次都要清掉,而请求头是固定的12个字节,所以不用清除
private boolean transferData() throws Exception { int writeSizeZeroTimes = 0; // Write Header while (this.byteBufferHeader.hasRemaining()) { int writeSize = this.socketChannel.write(this.byteBufferHeader); } if (null == this.selectMapedBufferResult) { return !this.byteBufferHeader.hasRemaining(); } writeSizeZeroTimes = 0; // Write Body if (!this.byteBufferHeader.hasRemaining()) { while (this.selectMapedBufferResult.getByteBuffer().hasRemaining()) { int writeSize = this.socketChannel.write(this.selectMapedBufferResult.getByteBuffer()); } } boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMapedBufferResult.getByteBuffer().hasRemaining(); if (!this.selectMapedBufferResult.getByteBuffer().hasRemaining()) { this.selectMapedBufferResult.release(); this.selectMapedBufferResult = null; } return result; }数据到了从服务会先把消息放入内存HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);,然后有另外的线程进行定时刷盘,
private boolean processReadEvent() { int readSizeZeroTimes = 0; while (this.byteBufferRead.hasRemaining()) { try { int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now(); readSizeZeroTimes = 0; boolean result = this.dispatchReadRequest();
具体解析数据
private boolean dispatchReadRequest() { final int MSG_HEADER_SIZE = 8 + 4; // phyoffset + size int readSocketPos = this.byteBufferRead.position(); while (true) { int diff = this.byteBufferRead.position() - this.dispatchPostion; if (diff >= MSG_HEADER_SIZE) { long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion); int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8); long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); if (slavePhyOffset != 0) { if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset); return false; } } if (diff >= (MSG_HEADER_SIZE + bodySize)) { byte[] bodyData = new byte[bodySize]; this.byteBufferRead.position(this.dispatchPostion + MSG_HEADER_SIZE); this.byteBufferRead.get(bodyData); HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); this.byteBufferRead.position(readSocketPos); this.dispatchPostion += MSG_HEADER_SIZE + bodySize; if (!reportSlaveMaxOffsetPlus()) { return false; } continue; } } if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); } break; } return true; }最后给主回复自己这边最新的偏移量reportSlaveMaxOffsetPlus,然后触发 this.groupTransferService.notifyTransferSome();时就会满足条件,成功返回。
相关文章推荐
- RocketMQ原理解析-broker 2.消息存储
- 源码分析RocketMQ之CommitLog消息存储机制
- RocketMQ原理解析-broker 2.消息存储
- 消息中间件 RocketMQ源码解析:Message存储
- (转)RocketMQ源码学习--消息存储篇
- RocketMQ源码学习--消息存储篇
- 消息中间件 RocketMQ源码解析:事务消息
- Apache RocketMQ 正式开源分布式事务消息
- 消息顺序和消息事务 - RocketMQ及分布式消息系统的原理以及重要问题解读
- 消息队列选型[首选Kafka](备选:RabbitMQ/NSQ/RocketMQ/disque/Kafka)
- RocketMQ原理解析-producer 6.消息在broker落地之事物消息
- rocketmq3.26研究之一存储层
- 源码分析RocketMQ之消费队列、Index索引文件存储结构与存储机制-上篇
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- RocketMQ 拉取消息-通信模块
- RocketMQ原理解析-consumer 4.长轮询push消息—并发消费消息
- 消息队列选型[首选Kafka](备选:RabbitMQ/NSQ/RocketMQ/disque/Kafka)
- 消息队列比较-rabbitmq/kafka/rocketmq/ONS
- RocketMQ(8)——消息高可靠