您的位置:首页 > 其它

RocketMQ CommitLog And Index

2018-04-18 14:22 267 查看

1. Commit Log











2. Index



2.1. IndexHeader

// IndexHeader.java
public static final int INDEX_HEADER_SIZE = 40;
private static int beginTimestampIndex = 0;
private static int endTimestampIndex = 8;
private static int beginPhyoffsetIndex = 16;
private static int endPhyoffsetIndex = 24;
private static int hashSlotcountIndex = 32;
private static int indexCountIndex = 36;
private final ByteBuffer byteBuffer;
private AtomicLong beginTimestamp = new AtomicLong(0);
private AtomicLong endTimestamp = new AtomicLong(0);
private AtomicLong beginPhyOffset = new AtomicLong(0);
private AtomicLong endPhyOffset = new AtomicLong(0);
private AtomicInteger hashSlotCount = new AtomicInteger(0);

private AtomicInteger indexCount = new AtomicInteger(1);

public IndexHeader(final ByteBuffer byteBuffer) {
this.byteBuffer = byteBuffer;
}

public void load() {
this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex)); // CCC beginTimestamp  8位long类型,索引文件构建第一个索引的消息落在broker的时间
this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));     // CCC endTimestamp    8位long类型,索引文件构建最后一个索引消息落broker时间
this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex)); // CCC beginPhyOffset  8位long类型,索引文件构建第一个索引的消息commitLog偏移量
this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));     // CCC endPhyOffset    8位long类型,索引文件构建最后一个索引消息commitLog偏移量

this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));    // CCC hashSlotCount   4位int类型,构建索引占用的槽位数(这个值貌似没有具体作用)
this.indexCount.set(byteBuffer.getInt(indexCountIndex));          // CCC indexCount      4位int类型,索引文件中构建的索引个数

if (this.indexCount.get() <= 0) {
this.indexCount.set(1);
}
}

public void updateByteBuffer() {
this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());
this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());
this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());
this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());
this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());
this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
}

// ---- getter/setter ----
// ...


2.2. IndexFile

// IndexFile.java
/* CCC 写index文件 */
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

FileLock fileLock = null;

try {

int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // CCC 获取当前slot段中的值
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}

long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

timeDiff = timeDiff / 1000;

if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}

int absIndexPos = // CCC 索引段位置
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;

this.mappedByteBuffer.putInt(absIndexPos, keyHash);                       // CCC  keyHash           4位int值,存储的是key的hash值
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);          // CCC  commitlog offset  8位long值,存储的是消息在commitlog的物理偏移量phyOffset
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);  // CCC  timediff          4位int值,存储了当前消息跟索引文件中第一个消息在broker落地的时间差
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);   // CCC  slotValue         4位int值,无hash冲突,slotValue=0;若hash冲突存储上一个消息的slotValue槽位,计算索引地址

this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());// CCC absSlotPos        4位int值,slot槽位存储消息个数索引

if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}

this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);

return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}

return false;
}

public int indexKeyHashMethod(final String key) {
int keyHash = key.hashCode();
int keyHashPositive = Math.abs(keyHash);
if (keyHashPositive < 0)
keyHashPositive = 0;
return keyHashPositive;
}

/* CCC 读Index文件 */
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}

int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }

if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}

int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;

int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

if (timeDiff < 0) {
break;
}

timeDiff *= 1000L;

long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}

if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}

nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}

this.mappedFile.release();
}
}
}


3. 参考资料

略。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RocketMQ commit-log