您的位置:首页 > 其它

kafka协议-record解析

2017-08-01 00:00 113 查看

介绍

kafka的消息是以record方式的存储下来。

Record

Record是接口,DefaultRecord实现了Record接口。

DefaultRecord的存储结构

Record =>
Length => varint    #  record总长度(不包括Length本身)
Attributes => int8    # 属性
TimestampDelta => varint    # timestamp的偏移量(相对于RecordBatch的baseTimestamp)
OffsetDelta => varint    # offset的偏移量(相对于RecordBatch的baseOffset)
KeyLen => varint    # key的长度
Key => data    # key的数据
ValueLen => varint    # value的长度
Value => data    # value的数据
NumHeaders => varint    # header的数量
Headers => [Header]    # Header列表

Header =>
HeaderKeyLen => varint    # key的长度
HeaderKey => string    # key的数据
HeaderValueLen => varint    # value的长度
HeaderValue => data    # value的数据

上面数据类型有varint,这个类型是Protocol Buffers的类型。在存储数值比较小的时候,会节省空间。具体参考链接https://developers.google.com/protocol-buffers/docs/encoding

DefaultRecord类

DefaultRecord是上述数据结构的封装

public class DefaultRecord implements Record {

// excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
public static final int MAX_RECORD_OVERHEAD = 21;

private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1);

private final int sizeInBytes;
private final byte attributes;
private final long offset;
private final long timestamp;
private final int sequence;
private final ByteBuffer key;
private final ByteBuffer value;
private final Header[] headers;

private DefaultRecord(int sizeInBytes,
byte attributes,
long offset,
long timestamp,
int sequence,
ByteBuffer key,
ByteBuffer value,
Header[] headers) {
this.sizeInBytes = sizeInBytes;
this.attributes = attributes;
this.offset = offset;
this.timestamp = timestamp;
this.sequence = sequence;
this.key = key;
this.value = value;
this.headers = headers;
}

@Override
public long offset() {
return offset;
}

@Override
public int sequence() {
return sequence;
}

........

@Override
public Header[] headers() {
return headers;
}

/**

DefaultRecord读取

DefaultRecord提供了从buffer读取数据,实例化的方法

// 从DataInput读取数据
public static DefaultRecord readFrom(DataInput input,
long baseOffset,  long baseTimestamp,
int baseSequence, Long logAppendTime) throws IOException {
// 读取Length
int sizeOfBodyInBytes = ByteUtils.readVarint(input);
// 分配buffer
ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
// 读取body数据
input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
// 计算整个record的长度,包括Length
int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
baseSequence, logAppendTime);
}

// 从ByteBuffer读取数据,baseOffset, baseTimestamp,baseSequence都是RecordBatch的属性
public static DefaultRecord readFrom(ByteBuffer buffer, long baseOffset,
long baseTimestamp, int baseSequence,  Long logAppendTime) {
// 读取Length
int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
// 检查长度
if (buffer.remaining() < sizeOfBodyInBytes)
return null;
// 整个record的总长度(包括Length)
int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
baseSequence, logAppendTime);
}

private static DefaultRecord readFrom(ByteBuffer buffer, int sizeInBytes,
int sizeOfBodyInBytes, long baseOffset, long baseTimestamp,
int baseSequence, Long logAppendTime) {
try {
// 记录record的开始位置
int recordStart = buffer.position();
// 读取Attributes
byte attributes = buffer.get();
// 读取TimestampDelta
long timestampDelta = ByteUtils.readVarlong(buffer);
// 计算timestamp,baseTimestamp是RecordBatch的属性
long timestamp = baseTimestamp + timestampDelta;
if (logAppendTime != null)
timestamp = logAppendTime;
// 读取OffsetDelta
int offsetDelta = ByteUtils.readVarint(buffer);
// 计算offset,baseOffset是RecordBatch的属性
long offset = baseOffset + offsetDelta;

int sequence = baseSequence >= 0 ?
DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
RecordBatch.NO_SEQUENCE;

ByteBuffer key = null;
// 读取KeySize
int keySize = ByteUtils.readVarint(buffer);
if (keySize >= 0) {
// 取出Key这段值
key = buffer.slice();
// 设置切片的limit值
key.limit(keySize);
// 设置buffer的position,跳过Key这段
buffer.position(buffer.position() + keySize);
}

ByteBuffer value = null;
// 读取ValueSize
int valueSize = ByteUtils.readVarint(buffer);
if (valueSize >= 0) {
// 取出Value这段值
value = buffer.slice();
// 设置切片的limit值
value.limit(valueSize);
// 设置buffer的position,跳过Value这段
buffer.position(buffer.position() + valueSize);
}

// 读取header的数量
int numHeaders = ByteUtils.readVarint(buffer);
if (numHeaders < 0)
throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
final Header[] headers;
if (numHeaders == 0)
headers = Record.EMPTY_HEADERS;
else
// 读取headers
headers = readHeaders(buffer, numHeaders);

// 当record数据已经读取完,检查长度
if (buffer.position() - recordStart != sizeOfBodyInBytes)
throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
" bytes in record payload, but instead read " + (buffer.position() - recordStart));

return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
} catch (BufferUnderflowException | IllegalArgumentException e) {
throw new InvalidRecordException("Found invalid record structure", e);
}
}

读取headers

private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) {
Header[] headers = new Header[numHeaders];
for (int i = 0; i < numHeaders; i++) {
// 读取header的KeySize
int headerKeySize = ByteUtils.readVarint(buffer);
if (headerKeySize < 0)
throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
// 读取header的key,并且转为utf-8
String headerKey = Utils.utf8(buffer, headerKeySize);
// 设置buffer的postion,跳过key
buffer.position(buffer.position() + headerKeySize);

ByteBuffer headerValue = null;
// 读取header的value
int headerValueSize = ByteUtils.readVarint(buffer);
if (headerValueSize >= 0) {
// 切片,取出headerValue这一段
headerValue = buffer.slice();
// 设置headerValue的limit值
headerValue.limit(headerValueSize);
// 设置buffer的postion,跳过value
buffer.position(buffer.position() + headerValueSize);
}
// 实例化 RecordHeader, RecordHeader只是对key和value的封装
headers[i] = new RecordHeader(headerKey, headerValue);
}

return headers;
}

DefaultRecord长度计算

public static int sizeInBytes(int offsetDelta,
long timestampDelta,
int keySize,
int valueSize,
Header[] headers) {
// 计算body的长度
int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
// 加上Length的长度
return bodySize + ByteUtils.sizeOfVarint(bodySize);
}

private static int sizeOfBodyInBytes(int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) {
// 计算keySize的值
int keySize = key == null ? -1 : key.remaining();
// 计算valueSize的值
int valueSize = value == null ? -1 : value.remaining();
return sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
}

private static int sizeOfBodyInBytes(int offsetDelta,
long timestampDelta,
int keySize,
int valueSize,
Header[] headers) {
// attribute占1byte
int size = 1;
// 计算offsetDelta的长度
size += ByteUtils.sizeOfVarint(offsetDelta);
// 计算timestampDelta的长度
size += ByteUtils.sizeOfVarlong(timestampDelta);
// 计算剩下key,value和headers的长度
size += sizeOf(keySize, valueSize, headers);
return size;
}

private static int sizeOf(int keySize, int valueSize, Header[] headers) {
int size = 0;
if (keySize < 0)
size += NULL_VARINT_SIZE_BYTES;
else
// keySize本身的长度,和key的长度
size += ByteUtils.sizeOfVarint(keySize) + keySize;

if (valueSize < 0)
size += NULL_VARINT_SIZE_BYTES;
else
// valueSize本身的长度,和value的长度
size += ByteUtils.sizeOfVarint(valueSize) + valueSize;

if (headers == null)
throw new IllegalArgumentException("Headers cannot be null");
// NumHeaders本身的长度
size += ByteUtils.sizeOfVarint(headers.length);
for (Header header : headers) {
String headerKey = header.key();
if (headerKey == null)
throw new IllegalArgumentException("Invalid null header key found in headers");
// header的keySize
int headerKeySize = Utils.utf8Length(headerKey);
// keySize本身的长度,和key的长度
size += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize;

byte[] headerValue = header.value();
if (headerValue == null) {
size += NULL_VARINT_SIZE_BYTES;
} else {
// valueSize本身的长度,和value的长度
size += ByteUtils.sizeOfVarint(headerValue.length) + headerValue.length;
}
}
return size;
}

概括

kafka的一条消息,对应着一条Record。DefaultRecord实现了Record接口,数据结构采用了新的varint类型,减少了空间存储。Record依赖着RecordBatch的存储,里面的offset,timestamp等都和RecordBatch有关。RecordBatch在下节会有介绍。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka record