您的位置:首页 > 其它

netty(十一)源码分析之ByteBuf 二

2017-08-23 20:05 996 查看
3.readerIndex和writerIndex
Netty提供了两个指针变量用于支持顺序读取和写入操作:readerIndex用于标识读取索引,writerIndex用于标识写入索引。两个位置指针将ByteBuf缓冲区分割成三个区域,如下图所示:


调用ByteBuf的read操作时,从readerIndex处开始读取。readerIndex到writerIndex之间的空间为可读的字节缓冲区:从writerIndex到capacity之间为可写的字节缓冲区;0到readerIndex之间是已经读取过的缓冲区,可以调用discardReadBytes操作来重用这部分空间,以节约内存,防止ByteBuf的动态扩张。这在私有协议栈消息解码的时候非常有用,因为TCP底层可能粘包,几百个整包消息被TCP粘包后作为一个整包发送。这样,通过discardReadBytes操作可以重用之前已经解码过的缓冲区,从而防止接收缓冲区因为容量不足导致的扩张。但是,discarReadBytes操作是把双刃剑,不能滥用

4.Discardable bytes

相比于其他的Java对象,缓冲区的分配和释放是个耗时的操作,因此,我们需要尽量重用它们。由于缓冲区的动态扩张需要进行字节数组的复制,它是个耗时的操作,因此,为了最大程度地提升性能,往往需要尽最大努力提升缓冲区的重用率。

加入缓冲区包含了N个整包消息,每个消息的长度为L,消息的可写字节数为R。当读取M个整包消息后,如果调用ByteBuf做压缩或者discardReadBytes操作,则可写的缓冲区长度依然为R。如果调用discardReadBytes操作,则可写字节数会变为R=(R+M*L),之前已经读取的M个整包的空间会被重用。假如此时ByteBuf需要写入R+1个字节,则不需要动态扩张ByteBuf.

ByteBuf的discardReadBytes操作效果图如下:

操作之前如图:



操作之后如图:



需要指出的是,调用discarReadBytes会发生字节数组的内存复制,所以,频繁调用将会导致性能下降,因此在调用它之前要确认你确实需要这样做,例如牺牲性能来换取更多的可用内存。

需要指出的是,调用discardReadBytes操作之后的writable bytes 内容处理策略跟ByteBuf接口的具体实现有关。

public ByteBuf discardReadBytes() {
this.ensureAccessible();
if(this.readerIndex == 0) {
return this;
} else {
if(this.readerIndex != this.writerIndex) {
this.setBytes(0, this, this.readerIndex, this.writerIndex - this.readerIndex);
this.writerIndex -= this.readerIndex;
this.adjustMarkers(this.readerIndex);
this.readerIndex = 0;
} else {
this.adjustMarkers(this.readerIndex);
this.writerIndex = this.readerIndex = 0;
}

return this;
}
}
5.Readable bytes和Writable bytes

可读空间段是数据实际存储的区域,以read或者skip开头的任何操作都将会从readerIndex开始读取或者跳过指定的数据,操作完成之后readerIndex增加了读取或者跳过的字节数长度。如果读取的字节数长度大于实际可读的字节数,则抛出IndexOutOfBoundsException。当新分配,包装或者复制一个新的ByteBuf对象时,它的readerIndex为0.

可写空间段是尚未被使用可以填充的空闲空间,任何以write开头的操作都会从writerIndex开始向空闲空间写入字节,操作完成之后writerIndex增加了写入的字节数长度。如果写入的字节数大于可写的字节数,则会抛出IndexOutOfBoundsException异常。新分配一个ByteBuf对象时,它的readerIndex为0.通过包装或者复制的方式创建一个新的ByteBuf对象时,它的writerIndex是ByteBuf的容量。

6.Clear操作

正如JDK ByteBuffer的clear操作,它并不会清空缓冲区内容本身,例如填充为NUL(0x00).它主要用来操作位置指针,例如position,limit和mark。对于ByteBuf,它也是用来操作readerIndex和writerIndex,将它们还原为初始分配值。

Clear()操作之前如下图所示:



Clear()操作之后如下图所示:



7.Mark和Rest

当对缓冲区进行读操作时,由于某种原因,可能需要对之前的操作进行回滚。读操作并不会改变缓冲区的内容,回滚操作主要就是重新设置索引信息。

对于JDK的ByteBuffer,调用mark操作会将当前的位置指针备份到mark变量中,当调用rest操作之后,重新将指针的当前位置恢复为备份在mark中的值。

Netty的ByteBuf也有类似的rest和mark接口,因为ByteBuf有读索引和写索引,因此,它共有4个相关的方法,分别如下:

markReaderIndex:将当前的readerIndex备份到markedReaderIndex中;
resetReaderIndex:将当前的readerIndex设置为markedReaderIndex;
markWriterIndex:将当前的writerIndex备份到markedWriterIndex;
resetWriteIndex:将当前的writerIndex设置为markedWriterIndex。

public ByteBuf markReaderIndex() {
this.markedReaderIndex = this.readerIndex;
return this;
}

public ByteBuf resetReaderIndex() {
this.readerIndex(this.markedReaderIndex);
return this;
}

public ByteBuf markWriterIndex() {
this.markedWriterIndex = this.writerIndex;
return this;
}

public ByteBuf resetWriterIndex() {
this.writerIndex = this.markedWriterIndex;
return this;
}
8.查找操作

很多时候,需要从ByteBuf中查找某个字符,例如通过“\r\n”作为文本字符串的换行符,利用“NUL(0x00)”作为分隔符。

ByteBuf提供了多种查找方法用于满足不同的应用场景,详细分类如下。

(1)indexOf(int fromIndex,int toIndex,byte value):从当前ByteBuf中定位出首次出现value的位置。起始索引为fromIndex,终点是toIndex。如果没有查找到则返回-1,否则返回第一条满足搜索条件的位置索引。

(2)bytesBefore(byte value):从当前ByteBuf中定位出首次出现value的位置。起始索引为readerIndex,终点是writerIndex。如果没有查找到则返回-1,否则返回第一条满足搜索条件的位置索引。该方法不会修改readerIndex和writerIndex。

(3)bytesBefore(int length,byte value):从当前ByteBuf中定位出首次出现value的位置。起始索引为readerIndex,终点是readerIndex+length。如果没有查找到则返回-1,否则返回第一条满足索引条件的位置索引。如果length大于当前字节缓冲区的可读字节数,则抛出IndexOutOfBoundsException异常。

(4)bytesBefor(int index,int leng,byte value):从当前ByteBuf中定位首次出现value的位置。起始索引为index,终点是index+length.如果没有查找到则返回-1,否则返回第一条满足搜索条件的位置索引。如果index+length大于当前字节缓冲区的容量,则抛出IndexOutOfBoundsException异常。

(5)forEachByte(ByteBu
4000
fProcessor processor):遍历当前ByteBuf的可读字节数组,与ByteBufProcessor设置的查找条件进行对比。如果满足条件,则返回位置索引,否则返回-1.

(6)forEachByte(int index,int length,ByteBufProcessor processor):以index为起始位置,index+length为终止位置进行遍历,与ByteBufProcessor设置的查找条件进行对比。如果满足条件,则返回位置索引,否则返回-1.

(7)forEachByteDesc(ByteBufProcessor processor):遍历当前ByteBuf的可读字节数组,与ByteBufProcessor设置的查找条件进行对比。如果满足条件,则返回位置索引,否则返回-1.注意对字节数组进行迭代的时候采用逆序的方式,也就是从writerIndex-1开始迭代,直到readerIndex。

(8)forEachByteDesc(int index,int length,ByteBufProcessor processor):以index为起始位置,index+length为终止位置进行遍历,与ByteBufProcessor设置的查找条件进行对比。如果满足条件,则返回位置索引,否则返回-1.采用逆序查找的方式,从index+length-1开始,直到index。

对于查找的字节而言,存在一些常用值,例如回车换行符,常用的分隔符等,Netty为了减少业务的重复定义,在ByteBufProessor接口中对这些常用的查找字节进行了抽象,包括\n,\r等。如果希望自定义查找规则,实现ByteBufProcessor的processor接口即可。



10.转换成标准的ByteBuffer



ByteBuf源码分析

ByteBuf的主要继承关系







AbstractByteBuf源码分析

AbstractByteBuf继承自ByteBuf.ByteBuf的一些公共属性和功能会在AbstarctByteBuf中实现,下面我们对其属性和重要代码进行分析解读。

1.主要成员变量

首先,像读索引,写索引,mark,最大容量等公共属性需要定义,

我们重点关注下leakDetector,它被定义为static,意味着所有的ByteBuf实例共享同一个ResourceLeakDetector对象。ResourceLeakDetector用于检测对象是否泄露,后面我们会做专门讲解

static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector(ByteBuf.class);
int readerIndex;
private int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
private int maxCapacity;
private SwappedByteBuf swappedBuf;


我们发现,在AbstractByteBuf中并没有定义ByteBuf的缓冲区实现,例如byte数组或者DirectByteBuffer,原因显而易见,因为AbstractByteBuf并不清楚子类到底是基于堆内存还是直接内存,因此无法提前定义。

2.读操作簇

无论子类如何实现ByteBuf,例如UnpooledHeapByteBuf使用byte数组表示字节缓冲区,UnpooledDirectByteBuf直接使用ByteBuffer,它们的功能都是相同的,操作的结果是等价的。

read类操作的方法如下图所示:



限于篇幅,我们不能一一枚举,挑选其中框线所示的readBytes(byte[] dst,int dstIndex,int length)方法进行分析,首先看下源码实现。

public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
this.checkReadableBytes(length);
this.getBytes(this.readerIndex, dst, dstIndex, length);
this.readerIndex += length;
return this;
}
在读之前,首先对缓冲区的可用空间进行校验,校验代码如下:
protected final void checkReadableBytes(int minimumReadableBytes) {
this.ensureAccessible();
if(minimumReadableBytes < 0) {
throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
} else if(this.readerIndex > this.writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", new Object[]{Integer.valueOf(this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf(this.writerIndex), this}));
}
}


如果读取的长度小于0,则抛出IllegalArgumentException异常提示参数非法;如果readerIndex+读取字节数>writeIndex,则抛出IndexOutOfBoundsException异常。由于异常中封装了详细的异常信息,所以使用者可以非常方便地进行问题定位。

校验通过之后,调用getBytes方法,从当前的都索引开始,复制length个字节到目标byte数组中。由于不同的子类复制操作的技术实现细节不同,因此该方法由子类实现.

3.写操作簇

与读取操作类似,写操作的公共行为在AbstractByteBuf中实现,它的API列表如下图:

我们选择与读取配套的writeBytes(byte[] src,int srcIndex,int length)进行分析,它的功能是将源字节数组中从srcIndex开始,到srcIndex+length截至的字节数组接入到当前的ByteBuf中。下面具体看下源码


首先对写入字节数组的长度进行合法性校验。

如果写入的字节数组长度小于0,则抛出IllegalArgumentException异常;如果写入的字节数组长度小于当前ByteBuf可写的字节数,说明可以写入成功,直接返回;如果写入的字节数组长度大于可以动态扩展的最大可写字节数,说明缓冲区无法写入超过其最大容量的字节数组,抛出IndexOutOfBoundsException。

如果当前写入的字节数组长度虽然大于目前ByteBuf的可写字节数,但是通过自身的动态扩展可以满足新的写入请求,则进行动态扩展。

public ByteBuf ensureWritable(int minWritableBytes) {
if(minWritableBytes < 0) {
throw new IllegalArgumentException(String.format("minWritableBytes: %d (expected: >= 0)", new Object[]{Integer.valueOf(minWritableBytes)}));
} else if(minWritableBytes <= this.writableBytes()) {
return this;
} else if(minWritableBytes > this.maxCapacity - this.writerIndex) {
throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", new Object[]{Integer.valueOf(this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf(this.maxCapacity), this}));
} else {
int newCapacity = this.calculateNewCapacity(this.writerIndex + minWritableBytes);
this.capacity(newCapacity);
return this;
}
}

由于很多场景下我们无法预先判断需要编码和解码的POJO对象长度,因此只能根据经验数据给个估计值。如果这个值偏大就会导致内存浪费;如果这个值偏小,遇到大消息编码的时候就会发生缓冲区溢出异常。使用者需要自己捕获这个异常,并重新计算缓冲区大小,将原来的内容复制到新的缓冲区中,然后重置指针。这种处理策略对用户非常不友好,而且稍有不慎,就会引入新的问题。

Netty的ByteBuf可以动态扩展,为了保证安全性,允许使用者指定最大的容量,在容量范围内,可以先分配个较小的初始容量,后面不够用动态扩展,这样可以达到功能和性能的最优组合。

我们继续看calculateNewCapacity方法的实现。首先需要重新计算下扩展后的容量,它有一个参数,等于writerIndex+minWritableBytes,也就是满足要求的最小容量。

首先设置门限阈值为4M,当需要的新容量正好等于门限阈值时,使用阈值作为新的缓冲区容量。如果新申请的内存空间大于阈值,不能采用倍增的方式(防止内存膨胀和浪费)扩张内存,而采用每次倍增4M的方式进行内存扩张。扩张的时候需要对扩张后的内存和最大内存(maxCapacity)进行比较,如果大于缓冲区的最大长度,则使用maxCapacity作为扩容后的缓冲区容量。

private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4 MiB page

if (minNewCapacity == threshold) {
return threshold;
}

// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}

// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}

return Math.min(newCapacity, maxCapacity);
}

如果扩容后的新容量小于阈值,则以64为计数进行倍增,直到倍增后的结果大于或等于需要的容量值。

采用倍增或者步进算法的原因如下:如果以minNewCapacity作为目标容量,则本次扩容后的可写字节数刚好够本次写入使用。写入完成后,它的可写字节数会变为0,下次做写入操作的时候需要做动态扩张。这样就会形成第一次动态扩张,每次写入操作都会进行动态扩张,由于动态扩张需要进行内存复制,频繁的内存复制会导致性能下降。

采用先倍增后步进的原因如下:当内存比较小的情况下,倍增操作并不会带来太多的内存浪费,例如64字节-->128字节-->256字节,这样的内存扩张方式对于大多数应用系统是可以接受的。但是,当内存增长到一定阈值后,在进行倍增就会带来额外的内存浪费,例如10MB,采用倍增后变为20MB。但很有可能系统只需要12MB,则扩张到20M会带来8MB的内存浪费。由于每个客户端连接都可能维护自己独立的接收和发送缓冲区,这样随着客户读的线性增长,内存浪费也会成比例地增加,因此,达到某个阈值后就需要以步进的方式对内存进行平滑的扩张。

这个阈值是个经验值,不同的应用场景,这个值可能不同,此处,ByteBuf取值为4MB。

重新计算完动态扩张后的目标容量后,需要重新创建个新的缓冲区,将原缓冲区的内容复制到新创建的ByteBuf中,最后设置读写索引和mark标签等。由于不同的子类会对应不同的复制操作,所以该方法依然是个抽象方法,由子类负责实现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  netty ByteBuf