您的位置:首页 > 其它

RocketMQ的消息存储

2017-12-15 08:31 344 查看
1.客户端发送消息给任一broker服务端,服务端用SendMessageProcessor接收private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
final RemotingCommand request, //
final SendMessageContext sendMessageContext, //
final SendMessageRequestHeader requestHeader) throws RemotingCommandException2.经过一系列校验,PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);进入DefaultMessageStore的PutMessageResult result = this.commitLog.putMessage(msg);到达进行消息存储的commitLog类进行实际的存储操作。
3.这里会加对象锁,保证顺序性,要存储消息,必须要获取具体的文件操作类,这里会进行寻找目前最新的一个文件类,因为是按顺序写,一个文件写满之后才会创建新的文件,并且每个文件大小默认为1G,文件名字就是对应的偏移量,MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();然后进行操作文件并且返回操作结果AppendMessageResult result = mapedFile.appendMessage(msg, this.appendMessageCallback);
4.MapedFile 类记录了内存的写偏移量wrotePostion,真实文件的偏移量committedPosition,以及文件类file,文件的最初总偏移量fileFromOffset也就是文件名fileName,操作和映射这个文件的虚拟内存mappedByteBuffer,截取mappedByteBuffer获取到他的代表,然后把位移量设置到已经写到内存的偏移量,然后继续在后面追加内容即可,这样就像是直接在文件里面按顺序写入消息一样 public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
int currentPos = this.wrotePostion.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result =
cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
this.wrotePostion.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
}5.开始往内存里面写入消息数据,总共有20段数据要存,最后把这些数据放入mappedByteBuffer中,组装追加消息结果返回。追加消息之前计算需要写入的消息长度,再加上MAGICCODE魔数4个字节和文件剩余的最大空间这4个字段,后面这8个字节主要是防止文件没有空间时来标识文件已经结束。每一类文件基本都会有这个标志,用它来标识到底这个文件是txt还是jpg,然后再判断文件还有没有那么多空间,如果没有的话,就会写入那八个字节的数据,剩余的空间可以不用写,返回即可。消息文件默认是// CommitLog file size,default is 1G
    private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;一条消息默认是 // The maximum size of a single log file,default is 4M
    private int maxMessageSize = 1024 * 1024 * 4;如果速度足够快,消息足够大, 选择异步刷盘,byteBuffer.position(currentPos);这一块就很有可能会出现异常,当然这个情况基本不会存在,这是臆想而已。。。
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetMsgStoreItemMemory(maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode);
// 3 The remaining space may be any value
//

// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}当消息写完之后,会给topicQueueTable里面对应的偏移量加一。key是topic和queueId的组成6.判断结果状态,如果是已经到文件尾部,就会重新创建新的文件并返回重新来一次操作 switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMapedFile = mapedFile;
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
if (null == mapedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;具体操作在mapedFileQueue.getLastMapedFile里面,查找文件列表的最后一个文件是否写满
if (mapedFileLast != null && mapedFileLast.isFull()) {
createOffset = mapedFileLast.getFileFromOffset() + this.mapedFileSize;
}
 public boolean isFull() {
        return this.fileSize == this.wrotePostion.get();
    }
7.然后就是开始创建文件,一次会创建两个文件
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            String nextNextFilePath =
                    this.storePath + File.separator
                            + UtilAll.offset2FileName(createOffset + this.mapedFileSize);
mapedFile =
this.allocateMapedFileService.putRequestAndReturnMapedFile(nextFilePath,
nextNextFilePath, this.mapedFileSize);
把创建文件的需求放入PriorityBlockingQueue优先级的阻塞队列里,  booean offerOK = this.requestQueue.offer(nextReq); 然后就会进行超时等待boolean waitOK = result.getCountDownLatch().await(WaitTimeOut, TimeUnit.MILLISECONDS);
服务启动时AllocateMapedFileService这个线程会一直从这个队列里取出请求while (!this.isStoped() && this.mmapOperation())
 req = this.requestQueue.take();创建文件 MapedFile mapedFile = new MapedFile(req.getFilePath(), req.getFileSize());创建成果之后会提醒之前阻塞的请求req.getCountDownLatch().countDown();
8.判断刷盘,同步还是异步
同步时,直接往GroupCommitService.putRequest(request);设置超时时间等待处理 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());放入请求this.requestsWrite.add(request);,这个线程会一直交换读写列表,  this.waitForRunning(0);
                    this.doCommit();然后发现有请求时就会提交偏移量
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
 CommitLog.this.mapedFileQueue.commit(0);这个0代表分页,每页OS_PAGE_SIZE = 1024 * 4;
找到总的偏移量committedWhere找到需要提交的文件 MapedFile mapedFile = this.findMapedFileByOffset(this.committedWhere, true);然后开始提交,int offset = mapedFile.commit(flushLeastPages);把数据真正写入文件里,释放资源,设置提交偏移量
public int commit(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = this.wrotePostion.get();
//数据刷盘,内存到磁盘
this.mappedByteBuffer.force();
this.committedPosition.set(value);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
this.committedPosition.set(this.wrotePostion.get());
}
}

return this.getCommittedPosition();
}
之后释放请求锁  req.wakeupCustomer(flushOK);异步的话通知FlushRealTimeService线程自己处理,过程和同步类似9.数据复制到从broker有同步和异步,同步备份 if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()),主broker默认打开10912短偶和从进行消息同步HAService service = this.defaultMessageStore.getHaService();提交同步请求,触发线程requestsWrite和requestsRead数据交换,然后超时等待if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);

service.getWaitNotifyObject().wakeupAll();

boolean flushOK =
// TODO
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}服务启动时,HAService会启动对应的服务端连接监听,从服务HAClient连接主服务后保存连接,从监听读请求,主监听读写请求,这里直接使用nio代码编写,
public void start() {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
}
两端发送心跳的时间都为5s,从HAClient可以发送自己目前的偏移量
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);

for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
主ReadSocketService监听到之后会更新slaveRequestOffset,slaveAckOffset这两个变量
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;

HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}

HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
然后触发 this.groupTransferService.notifyTransferSome();但是这时这个条件还不满足,必须要等到主把新的的偏移量以及消息数据写入通道,
SelectMapedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {

b81e
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}

long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;

selectResult.getByteBuffer().limit(size);
this.selectMapedBufferResult = selectResult;

// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(HEADER_SIZE);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();

this.lastWriteOver = this.transferData();
}
组装数据头,以及body,最后清掉body占据的内存,因为他不是固定的某个值,并且占用内存比较大,所以需要每次都要清掉,而请求头是固定的12个字节,所以不用清除
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
}

if (null == this.selectMapedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}

writeSizeZeroTimes = 0;

// Write Body
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMapedBufferResult.getByteBuffer());
}
}

boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMapedBufferResult.getByteBuffer().hasRemaining();

if (!this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMapedBufferResult.release();
this.selectMapedBufferResult = null;
}

return result;
}
数据到了从服务会先把消息放入内存HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);,然后有另外的线程进行定时刷盘,
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
具体解析数据
 private boolean dispatchReadRequest() {
            final int MSG_HEADER_SIZE = 8 + 4; // phyoffset + size
            int readSocketPos = this.byteBufferRead.position();
            while (true) {
                int diff = this.byteBufferRead.position() - this.dispatchPostion;
                if (diff >= MSG_HEADER_SIZE) {
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                    if (slavePhyOffset != 0) {
                        if (slavePhyOffset != masterPhyOffset) {
                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                    + slavePhyOffset + " MASTER: " + masterPhyOffset);
                            return false;
                        }
                    }
                    if (diff >= (MSG_HEADER_SIZE + bodySize)) {
                        byte[] bodyData = new byte[bodySize];
                        this.byteBufferRead.position(this.dispatchPostion + MSG_HEADER_SIZE);
                        this.byteBufferRead.get(bodyData);
                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
                        this.byteBufferRead.position(readSocketPos);
                        this.dispatchPostion += MSG_HEADER_SIZE + bodySize;
                        if (!reportSlaveMaxOffsetPlus()) {
                            return false;
                        }
                        continue;
                    }
                }
                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }
                break;
            }
            return true;
        }
最后给主回复自己这边最新的偏移量reportSlaveMaxOffsetPlus,然后触发 this.groupTransferService.notifyTransferSome();时就会满足条件,成功返回。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: