您的位置:首页 > 编程语言 > Java开发

netty5.0源码解析 ByteBuf和相关辅助类

2015-01-20 16:27 316 查看

1.2 ByteBuf源码分析

1.2.6 PooledDirectByteBuf源码分析

本文是在netty权威指南的基础上写的。原书作者因篇幅原因没有深入讨论源码。个人在此作不定期的补充。

PooledDirectByteBuf基于内存池实现,与UnPooledDirectByteBuf的唯一不同就是缓冲区的分配和销毁策略不同,其他功能都是等同的,也就是说,两者唯一的不同就是内存分配策略不同。

1.创建字节缓冲区实例

由于采取内存池实现,所以新创建PooledDirectByteBuf对象时不能直接new一个实例,而是从内存池中获取,然后设置引用计数器的值。

static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();//从内存池中获取对象
buf.setRefCnt(1);//设置引用计数器
buf.maxCapacity(maxCapacity);
return buf;
}

直接从内存池Recycler<PooledDirectByteBuf> RECYCLER中获取PooledDirectByteBuf对象,然后设置它的引用计数器为1,设置缓冲区最大容量后返回。

PooledDirectByteBuf维护了一个Recycler内存池。重写覆盖了 protected abstract T newObject(Handle<T> handle)。

private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};

RECYCLER的get()方法会先从stack中获取对象,如果对象不存在则调用重写的newObject(Handle<PooledDirectByteBuf> handle)来得到新的对象

然后具体来看这个对象内存池Recycler<T>。Recycler is a Light-weight object pool based on a thread-local stack。它是一个依赖于线程局部栈的轻量级对象池。

首先是一个内部接口 Handle<T>,一个方法void recycle(T object)

public interface Handle<T> {
void recycle(T object);
}

接着就是一个静态内部类Stack<T>实现 Handle<T>。Stack<T>提供了pop()和push()方法,以栈的形式来管理内存池中的对象。它的底层使用的依旧是数组:private T[] elements。

static final class Stack<T> implements Handle<T> {

private static final int INITIAL_CAPACITY = 256;

final Recycler<T> parent;
final Thread thread;
private T[] elements;//底层使用数组实现存储
private int size;
private final Map<T, Boolean> map = new IdentityHashMap<T, Boolean>(INITIAL_CAPACITY);

@SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
Stack(Recycler<T> parent, Thread thread) {
this.parent = parent;
this.thread = thread;
elements = newArray(INITIAL_CAPACITY);
}

@Override
public void recycle(T object) {
parent.recycle(object, this);
}

T pop() {
int size = this.size;
if (size == 0) {
return null;
}
size --;
T ret = elements[size];
elements[size] = null;
map.remove(ret);
this.size = size;
return ret;
}

void push(T o) {
if (map.put(o, Boolean.TRUE) != null) {
throw new IllegalStateException("recycled already");
}

int size = this.size;
if (size == elements.length) {
T[] newElements = newArray(size << 1);
System.arraycopy(elements, 0, newElements, 0, size);
elements = newElements;
}

elements[size] = o;
this.size = size + 1;
}

@SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
private static <T> T[] newArray(int length) {
return (T[]) new Object[length];
}
}

Recycler<T>中还定义了一个ThreadLocal<Stack<T>> threadLocal。ThreadLocal使得各线程能够保持各自独立的一个对象(每个线程对应一个实例)。通过ThreadLocal.set()将这个新创建的对象的引用保存到各线程的自己的一个map中,每个线程都有这样一个map,执行ThreadLocal.get()时,各线程从自己的map中取出放进去的对象或者调用initialValue()创建一个新对象,因此取出来的是各自线程中的对象,ThreadLocal实例是作为map的key来使用的。

ThreadLocal类接口很简单,只有4个方法:

void set(T value)设置当前线程的线程局部变量的值。

public T get()该方法返回当前线程所对应的线程局部变量。

public void remove()将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是JDK 5.0新增的方法。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。

protected T initialValue()返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第1次调用get()或set(T)时才执行,并且仅执行1次。ThreadLocal中的缺省实现直接返回一个null。

关于ThreadLocal最后说一点,ThreadLocal不是用来解决对象共享访问问题的,它仅仅只是让各个线程都拥有了各自的对象。threadLocal本身仅仅只是作为一个key来帮助存储目标对象,它跟目标对象没有任何的联系仅仅只是些关于ThreadLocalMap操作方法。目标对象通过Thread中的ThreadLocalMap跟线程关联。所以不需要考虑在threadLocal上面的同步问题,除非你在线程方法中去修改threadLocal自身的一些状态,但这根本没必要。所以将它设置为静态变量就可以了免得每个线程都会有个threadLocal而浪费资源。多个线程共用一个threadLocal,但各个线程都拥有独立的目标对象。

2.复制新的字节缓冲区实例

调用copy(int index, int length)可以复制一个新的实例,与原来的PooledDirectByteBuf独立

@Override
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
ByteBuf copy = alloc().directBuffer(length, maxCapacity());
copy.writeBytes(this, index, length);
return copy;
}


首先是索引和长度的合法性校验。然后调用PooledByteBufAllocator分配一个新的ByteBuf。PooledByteBufAllocator没有实现directBuffer(int initialCapacity, int maxCapacity),所以最终会调用AbstractByteBufAllocator的directBuffer(int initialCapacity, int maxCapacity)

@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);//由子类去实现具体的策略
}


上面的 newDirectBuffer(initialCapacity, maxCapacity)对于不同的子类有不同的实现策略,如果是基于内存池的分配器,它会从内存池中获得可用的ByteBuf,如果是非内存池,则会直接创建新的byteBuf

基于内存池的缓冲区分配(PooledByteBufAllocator中):

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;

ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);//从内存池中获取
} else {
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}

return toLeakAwareBuffer(buf);
}


基于非内存池的缓冲区分配(UnpooledByteBufAllocator中):

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);//直接new创建一个新的
}

return toLeakAwareBuffer(buf);
}

3.子类实现相关的方法

ByteBuf汇总的一些接口是跟具体子类实现相关的,不同的子类功能是不同的。正如UnpooledHeapButeBuf,PooledDirectByteBuf也有子类实现相关功能。当我们操作子类实现相关方法时,需要对是否支持这些操作进行判断,否则会导致异常。

@Override
public boolean hasArray() {
return false;
}
@Override
public byte[] array() {
throw new UnsupportedOperationException("direct buffer");
}
@Override
public int arrayOffset() {
throw new UnsupportedOperationException("direct buffer");
}

@Override
public boolean hasMemoryAddress() {
return false;
}

@Override
public long memoryAddress() {
throw new UnsupportedOperationException();
}


1.3 相关的辅助类功能介绍

1.3.1 ByteBufHolder

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 源码 netty