Netty学习之旅------源码分析Netty线程本地分配机制与PooledByteBuf线程级对象池原理分析
2017-01-24 17:28
1036 查看
在讲上篇Netty内存分配的时候,没有考虑本地线程的缓存,也就是Netty在分配内存时,首先尝试从线程本地缓存中去申请,如果申请失败,才从全局分配。本章就重点分析线程缓存相关的实现。首先我们将目光投向PooledByteBufAllocator的 final PoolThreadLocalCache threadCache;该类实现的机制类似ThreadLocal,我们重点看一下PooledThreadLocalCache的源码:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final AtomicInteger index = new AtomicInteger();
@Override
protected PoolThreadCache initialValue() {
final int idx = index.getAndIncrement();
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
if (heapArenas != null) {
heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
} else {
heapArena = null;
}
if (directArenas != null) {
directArena = directArenas[Math.abs(idx % directArenas.length)];
} else {
directArena = null;
}
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
@Override
protected void onRemoval(PoolThreadCache value) {
value.free();
}
}
上述的代码比较简单,就是每个线程轮询访问线程中的PoolArena.HeapArena、PoolArena.DirectArena。然后每个线程对象保存着PoolThreadCache对象。所谓的线程本地分配,也就是指的在PoolThreadCache中进行分配,二话不说,马上进入到PoolThreadCache源码中:
1.1 PoolThreadCache 属性与构造函数分析
在方法前,已经对构造方法的入参加了说明,关注如下两个方法。
代码@1,创建createNormalCaches 。
由于PoolThreadCache的设计理念与PoolArena一样,本身并不涉及到具体内存的存储,PoolThreadCache内部维护MemoryRegionCache[] tinySubpageHeapCaches,MemoryRegionCache[]
smallSubpageHeapCaches,其数组长度与PoolArena相同,MemoryRegionCaches[] normalHeapCaches,缓存的是noraml内存,Netty把大于pageSize小于chunkSize的空间成为normal内存。normalHeapCaches[1] 是normalHeapCaches[0] 的2倍, 先重点关注PoolThreadCache createNormalCaches 源码:
参数 numCaches,为SubPageMemoryRegionCache[]数组的长度,而cacheSize,为每一个SubPageMemoryRegionCache中缓存的内存个数,也就是SubPageMemoryRegionCache中entries[]的长度。这里的cacheSize,就是PooledByteBufAllocator
DEFAULT_TINY_CACHE_SIZE=512,DEFAULT_SMALL_CACHE_SIZE=256,DEFAULT_NORMAL_SIZE=64,其实这里的取名为DEFAULT_TINY_CACHE_LENGTH更加贴切。
代码@1,其实应该不需要与area.chunkSize做比较,因为如果超过chunkSize的内存,netty不会重复使用,直接在整个堆空间或堆外空间申请并释放。这里可能是出于代码的自我保护,得到normalHeapCaches中单个 Entry所持有的内存不超过该值。
代码@2,计算normalHeapCaches数组的长度,这里有优化的空间,用位运算:int arraySize = Math.max(1, max >> numShiftsNormalHeap ),其中numShiftsNormalHeap为
log2(pageSize)。这样做的原因,也就是normalHeapCaches 数组中的元素的大小,是以2的幂倍pageSize递增的。cacheSize默认为64,参数值来源于PooledByteBufAllocator。接下来关注PoolThreadCache的allocateTiny方法:
1.2 PoolThreadCache allocateTiny方法
代码@1,根据需要申请的内存定位数组的下标,根据上文讲解的数组长度计算逻辑,相应的定位算法就显而易见了。
代码@2,MeomoryRegionCache内部持有的 Entry entries[]数组是真正持有内存的单元,故现在将重点转移到MemoryRegionCache的讲解中。
代码@3,如果分配次数达到freeSweepAllocationThreshold,进行一次尝试释放一次。具体代码见 trim()方法的讲解。
1.2.2 关于PoolThreadCache allocateForTiny 之MemoryRegionCache 源码解读【针对1.2代码@2】
1)MemoryRegionCache属性与构造方法详解
maxUnusedCached : 256,128,32,为size的一半;head:0
;tail:0 ; maxEntriesInUse : 0; entriesInUse
: 0
2)MemoryRegionCache的allocate方法详解
代码@1,从entries数组中获取一个entry,head指针表示下一个缓存的Entry。
代码@2,如果entry.chunk为空,则表示线程里暂未缓存内存,返回false,表示从本地线程中分配失败。
代码@3,每分配出一个Entry,则entriesInUse加1,表示正在使用的entry个数。
代码@5,用entry中的内存初始化ByteBuf。
代码@6,head指针加一,如果超过entries的length,则重新从0开始,其实也就是 (head + 1) % (entires.length - 1),这里使用的是位运算。如果成功分配,则返回true, 结束本次内存的分配。
1.2.3 关于PoolThreadCache allocateForTiny 之代码@3,trim方法详解:
该方法的目的是在本地线程分配达到一定次数后,检测一下从本地线程缓存分配的效率,如果总是分配不到,就是虽然本地有缓存一定的内存,但每次分配都没有找到合适内存供分配,此时需要释内存回全局分配池,避免浪费内存。
代码@1,size()方法返回的是 (tail-head) & (length-1),表示当前缓存了但未被使用的个数。maxEntriesInUse的值,其实就是entiryesInUse的值。
代码@2,代码@3,如果缓存的并且未使用的个数如果小于允许的值(maxUnusedCached)值是放弃本次内存释放,否则,需要将head到tail这部分的内存全部释放,返回给全局内存分配池。这里我可能没有理解透彻,如果是我实现的话,entriesInUse该值不会设置为空,而是直接释放掉
tail-head这部分的内存就好,释放算法在内存分配与释放篇已经做过详细解读,这里不重复讲解:
本地线程池关于内存的分配与释放旧梳理到这里了。
2、PooledByteBuf线程本地缓存专题(线程对象池)
到目前为止,我们更加关注的是PooledByteBuf内部持有的内存的管理,重复利用,显然Netty并不满足与此,PooledByteBuf本身是否也可以缓存呢?是的,一样可以缓存,并且netty从PooledByteBuf对象本身,指向的内存从两个方面进行缓存,回收利用,并不是将单一某个面进行一起缓存。下文,将从PooledByteBuf对象的回收利用这一层面进行Netty本地线程池来进行PooledByteBuf的重复利用。重复声明一下,PooledByteBuf对象池中缓存的PooledByteBuf,并没有任何缓存区(byte[]或java.nio.ByteBuffer)关联,只是PooledByteBuf本身,从对象池中获取一个PooledByteBuf后,还需要调用initBuf等方法进行内存的分配。
请看如下代码片段:来自PooledHeapByteBuf:
关注代码@1,@2创建一个PooledHeapByteBuf,是从一个静态变量 RECYLER的get方法中获取,代码@2的写法是不是和ThreadLocal的使用非常类似,所以本专题的主角,就非Recycler莫属了。
2.1、Recycler构造方法核心属性
看到这里,为了摸清楚Recycler的内部实现原理,我们只能将目光先投向Stack类。但一看又发现Statck内部维护着这样一个数据结构:DefaultHandle<?>[] elements;也就是一个Statck类维护这样一个DefaultHandle数组,所以,我们先将目光锁定在DefaultHandle上:
2.2 DefaultHandle源码详解
DefaultHandle,是对象池中最基本的单元,由该对象包裹着实际缓存的对象。
代码@1,@2,待下文分解
代码@3 获取当前释放的线程。
代码@4,如果释放当前的线程与Statck对象保持一致,直接将对象放入到该线程对象的Statck中即可。
代码@5,大意是说我们不希望当前调用recycle方法的线程与Handle对象中statck对象的线程竟然不一致,我们需要强制一个内存排序,这个我就有点懵逼了。先理解一下该代码的含义:
将Handle对象放入到收集线程的本地缓存中,存放的是一个 Map<Stack<?>, WeakOrderQueue>,然后将Handle加入到WeakOrderQueue中,WeakOrderQueue里面存放的对象是基于一个WeakReference,弱引用,在垃圾回收的时候会被清除掉,放入进对象池中的对象,在什么地方取出来呢?是在Statck的pop方法中吗?有待进一步跟踪学习,还有根据这个收集线程的本地变量存放的类型来看,是个Map,说明不只一个键值对,那这个收集线程是什么来头呢?以上两个问题,暂时缓一缓,先移步到Statck类,分析完后,才回过头来思考。
2.3 Statck 源码分析
看一段官方的介绍:
// we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
// than the stack owner recycles: when we run out of items in our stack we iterate this collection
// to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
// still recycling all items.
首先对于Statck目前,我只能理解放入elements中的对象,放入队列中对象,处了DefaultHandle中的 delayedRercycled有放入,但整个Recycler中未有相关使用语句,应该是Netty有额外的线程来辅助回收,这是个待定的问题?需要我慢慢去寻址。目前先按照常规流程讲解Statck,并抛出相关问题,希望大家予以帮助:
2.3.1 重要属性与构造函数解析
上面两个方法,在了解其数据结构后,其实现思路应该很详细了,不做过多的讲解,值得留意的是Recycler是一个抽象类,需要有具体的子类在对象池中没有对象时,需要创建一个新的对象,具体创建对象的过程由其子类实现。
方法的签名:protected abstract T newObject(Handle<T> handle);
总结:
本文详细介绍了Netty线程本地内存的分配释放机制。同时提出Netty是将PooledByteBuf 与 PooledByteBuf执行的缓存区是单独进行管理的,PooledByteBuf指向的内存缓存区统一由Netty的内存分配,释放机制来管理,而与此同时,Netty实现了基于本地线程的对象池,用来重复利用PooledByteBuf本身这个对象,从Netty本地线程池获取的PooledByteBuf对象,不能直接使用,需要为它在申请内存进行初始化。本地线程池每一个线程关联一个Statck对象,该对象维护着两个仓库,一个是DefaultHandle[]
entries,用来存放对象池,还维护了另外一仓库,是用WeakOrderQueue 来维护的队列,由于整个Recycler类中,并没有对WeakOrderQueue 的 head属性进行初始化,这里的机制目前我没有想明白,只是猜测,Netty除了我们手工调用 Recycler.recycle方法外,应该有外部线程,比如定时任务之类的线程进行回收,目前未找到,由于目前对Netty的全貌并不理解,该部分的问题先留着,待后续深入后再研究,也希望志同道合的朋友提供帮助,再次十分感谢。
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final AtomicInteger index = new AtomicInteger();
@Override
protected PoolThreadCache initialValue() {
final int idx = index.getAndIncrement();
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
if (heapArenas != null) {
heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
} else {
heapArena = null;
}
if (directArenas != null) {
directArena = directArenas[Math.abs(idx % directArenas.length)];
} else {
directArena = null;
}
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
@Override
protected void onRemoval(PoolThreadCache value) {
value.free();
}
}
上述的代码比较简单,就是每个线程轮询访问线程中的PoolArena.HeapArena、PoolArena.DirectArena。然后每个线程对象保存着PoolThreadCache对象。所谓的线程本地分配,也就是指的在PoolThreadCache中进行分配,二话不说,马上进入到PoolThreadCache源码中:
1.1 PoolThreadCache 属性与构造函数分析
final PoolArena<byte[]> heapArena; //使用轮叫轮询机制,每个线程从heapArena[]中获取一个,用于内存分配。 final PoolArena<ByteBuffer> directArena; //同上 // Hold the caches for the different size classes, which are tiny, small and normal. //针对不同大小,线程缓存的内存 private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches; private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches; private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<byte[]>[] normalHeapCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; // Used for bitshifting when calculate the index of normal caches later private final int numShiftsNormalDirect; private final int numShiftsNormalHeap; private final int freeSweepAllocationThreshold; private int allocations; private final Thread thread = Thread.currentThread(); //当前线程 private final Runnable freeTask = new Runnable() { //线程消亡后,释放资源,下文会重点讲解。 @Override public void run() { free0(); } }; // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; /* * @param heapArena 线程使用的PoolArena.HeapArena * @param directArena 线程使用的PoolArena.DirectArena * @param tinyCacheSize, tiny内存缓存的个数。默认为512 * @param smallCacheSize small内存缓存的个数,默认为256个 * @param normalCacheSize normalCacheSize缓存的个数,默认为64 * @param maxCacheBufferCapacity * normalHeapCaches中单个缓存区域的最大大小,默认为32k 也就是normalHeapCaches[length-1]中缓存的最大内存空间 * @param freeSweepAllocationThreshold 在本地线程每分配freeSweepAllocationThreshold 次内存后,检测一下是否需要释放内存。 */ PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { if (maxCachedBufferCapacity < 0) { throw new IllegalArgumentException("maxCachedBufferCapacity: " + maxCachedBufferCapacity + " (expected: >= 0)"); } if (freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + maxCachedBufferCapacity + " (expected: > 0)"); } this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); } else { // No directArea is configured so just null out all caches tinySubPageDirectCaches = null; smallSubPageDirectCaches = null; normalDirectCaches = null; numShiftsNormalDirect = -1; } if (heapArena != null) { // Create the caches for the heap allocations tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools); numShiftsNormalHeap = log2(heapArena.pageSize); normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); //@1 } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // The thread-local cache will keep a list of pooled buffers which must be returned to // the pool when the thread is not alive anymore. ThreadDeathWatcher.watch(thread, freeTask); }
在方法前,已经对构造方法的入参加了说明,关注如下两个方法。
代码@1,创建createNormalCaches 。
由于PoolThreadCache的设计理念与PoolArena一样,本身并不涉及到具体内存的存储,PoolThreadCache内部维护MemoryRegionCache[] tinySubpageHeapCaches,MemoryRegionCache[]
smallSubpageHeapCaches,其数组长度与PoolArena相同,MemoryRegionCaches[] normalHeapCaches,缓存的是noraml内存,Netty把大于pageSize小于chunkSize的空间成为normal内存。normalHeapCaches[1] 是normalHeapCaches[0] 的2倍, 先重点关注PoolThreadCache createNormalCaches 源码:
private static <T> NormalMemoryRegionCache<T>[] createNormalCaches( int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { if (cacheSize > 0) { int max = Math.min(area.chunkSize, maxCachedBufferCapacity); //@1 int arraySize = Math.max(1, max / area.pageSize); //@2 @SuppressWarnings("unchecked") NormalMemoryRegionCache<T>[] cache = new NormalMemoryRegionCache[arraySize]; for (int i = 0; i < cache.length; i++) { cache[i] = new NormalMemoryRegionCache<T>(cacheSize); } return cache; } else { return null; } }
参数 numCaches,为SubPageMemoryRegionCache[]数组的长度,而cacheSize,为每一个SubPageMemoryRegionCache中缓存的内存个数,也就是SubPageMemoryRegionCache中entries[]的长度。这里的cacheSize,就是PooledByteBufAllocator
DEFAULT_TINY_CACHE_SIZE=512,DEFAULT_SMALL_CACHE_SIZE=256,DEFAULT_NORMAL_SIZE=64,其实这里的取名为DEFAULT_TINY_CACHE_LENGTH更加贴切。
代码@1,其实应该不需要与area.chunkSize做比较,因为如果超过chunkSize的内存,netty不会重复使用,直接在整个堆空间或堆外空间申请并释放。这里可能是出于代码的自我保护,得到normalHeapCaches中单个 Entry所持有的内存不超过该值。
代码@2,计算normalHeapCaches数组的长度,这里有优化的空间,用位运算:int arraySize = Math.max(1, max >> numShiftsNormalHeap ),其中numShiftsNormalHeap为
log2(pageSize)。这样做的原因,也就是normalHeapCaches 数组中的元素的大小,是以2的幂倍pageSize递增的。cacheSize默认为64,参数值来源于PooledByteBufAllocator。接下来关注PoolThreadCache的allocateTiny方法:
1.2 PoolThreadCache allocateTiny方法
/** * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); } private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { int idx = PoolArena.tinyIdx(normCapacity); if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx); } /** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); } private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) { if (area.isDirect()) { int idx = log2(normCapacity >> numShiftsNormalDirect); return cache(normalDirectCaches, idx); } int idx = log2(normCapacity >> numShiftsNormalHeap); //@1 return cache(normalHeapCaches, idx); } private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { if (cache == null) { // no cache found so just return false here return false; } boolean allocated = cache.allocate(buf, reqCapacity); //@2 if (++ allocations >= freeSweepAllocationThreshold) { allocations = 0; trim(); //@3 } return allocated; }
代码@1,根据需要申请的内存定位数组的下标,根据上文讲解的数组长度计算逻辑,相应的定位算法就显而易见了。
代码@2,MeomoryRegionCache内部持有的 Entry entries[]数组是真正持有内存的单元,故现在将重点转移到MemoryRegionCache的讲解中。
代码@3,如果分配次数达到freeSweepAllocationThreshold,进行一次尝试释放一次。具体代码见 trim()方法的讲解。
1.2.2 关于PoolThreadCache allocateForTiny 之MemoryRegionCache 源码解读【针对1.2代码@2】
1)MemoryRegionCache属性与构造方法详解
private final Entry<T>[] entries; //MemoryRegionCache真正持有内存的地方 /* private static final class Entry<T> { PoolChunk<T> chunk; //具体的PoolChunk long handle; //内存持有偏移量,高32位保存的是bitmaIdx,低32位保存的是memoryMapIdx } */ private final int maxUnusedCached; //表示允许的最大的没有使用的内存数量(已经被缓存),默认为size的一半。 private int head; // 作用类似于ByteBuf的readerIndex,从该位置获取一个缓存的Entiry。 11f85 private int tail; // 作用类似于ByteBuf的writerIndex,从该位置增加一个加入一个新的Entity private int maxEntriesInUse; // 在使用中最大的entry数量 private int entriesInUse; // 目前使用中的entry数量 @SuppressWarnings("unchecked") MemoryRegionCache(int size) { // size 默认的大小为 512, 256, 64 entries = new Entry[powerOfTwo(size)]; for (int i = 0; i < entries.length; i++) { entries[i] = new Entry<T>(); } maxUnusedCached = size / 2; //允许被缓存,但没有使用的最大数量,超过该值,则会触发内存释放操作。 }初始状态的MemoryRegionCache的各个属性的值分别为:
maxUnusedCached : 256,128,32,为size的一半;head:0
;tail:0 ; maxEntriesInUse : 0; entriesInUse
: 0
2)MemoryRegionCache的allocate方法详解
/** * Allocate something out of the cache if possible and remove the entry from the cache. */ public boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { Entry<T> entry = entries[head]; //@1 if (entry.chunk == null) { //@2 return false; } entriesInUse ++; //@3 if (maxEntriesInUse < entriesInUse) { maxEntriesInUse = entriesInUse; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); //@4 // only null out the chunk as we only use the chunk to check if the buffer is full or not. entry.chunk = null; //@5 head = nextIdx(head); //@6 return true; }
代码@1,从entries数组中获取一个entry,head指针表示下一个缓存的Entry。
代码@2,如果entry.chunk为空,则表示线程里暂未缓存内存,返回false,表示从本地线程中分配失败。
代码@3,每分配出一个Entry,则entriesInUse加1,表示正在使用的entry个数。
代码@5,用entry中的内存初始化ByteBuf。
代码@6,head指针加一,如果超过entries的length,则重新从0开始,其实也就是 (head + 1) % (entires.length - 1),这里使用的是位运算。如果成功分配,则返回true, 结束本次内存的分配。
1.2.3 关于PoolThreadCache allocateForTiny 之代码@3,trim方法详解:
该方法的目的是在本地线程分配达到一定次数后,检测一下从本地线程缓存分配的效率,如果总是分配不到,就是虽然本地有缓存一定的内存,但每次分配都没有找到合适内存供分配,此时需要释内存回全局分配池,避免浪费内存。
void trim() { trim(tinySubPageDirectCaches); trim(smallSubPageDirectCaches); trim(normalDirectCaches); trim(tinySubPageHeapCaches); trim(smallSubPageHeapCaches); trim(normalHeapCaches); } private static void trim(MemoryRegionCache<?>[] caches) { if (caches == null) { return; } for (MemoryRegionCache<?> c: caches) { trim(c); } } private static void trim(MemoryRegionCache<?> cache) { if (cache == null) { return; } cache.trim(); } trim的具体实现是MemoryRegionCache,现在进入到MemoryRegionCache详解: /** * Free up cached {@link PoolChunk}s if not allocated frequently enough. */ private void trim() { int free = size() - maxEntriesInUse; //@1 entriesInUse = 0; maxEntriesInUse = 0; //@2 if (free <= maxUnusedCached) { //@3 return; } int i = head; for (; free > 0; free--) { if (!freeEntry(entries[i])) { // all freed break; } i = nextIdx(i); } // Update head to point to te correct entry // See https://github.com/netty/netty/issues/2924 head = i; }在进行该方法的实现逻辑之前,我先提供一张草图,形象的反映head,tail等说明:
代码@1,size()方法返回的是 (tail-head) & (length-1),表示当前缓存了但未被使用的个数。maxEntriesInUse的值,其实就是entiryesInUse的值。
代码@2,代码@3,如果缓存的并且未使用的个数如果小于允许的值(maxUnusedCached)值是放弃本次内存释放,否则,需要将head到tail这部分的内存全部释放,返回给全局内存分配池。这里我可能没有理解透彻,如果是我实现的话,entriesInUse该值不会设置为空,而是直接释放掉
tail-head这部分的内存就好,释放算法在内存分配与释放篇已经做过详细解读,这里不重复讲解:
@SuppressWarnings({ "unchecked", "rawtypes" }) private static boolean freeEntry(Entry entry) { PoolChunk chunk = entry.chunk; if (chunk == null) { return false; } // need to synchronize on the area from which it was allocated before. synchronized (chunk.arena) { chunk.parent.free(chunk, entry.handle); } entry.chunk = null; return true; }扫描一下MemoryRegionCache类,还有一个方法我们未曾分析过,就是add方法,默认一开始MemoryRegionCache类中的Entry[] entries中的PoolChunk与handle都是空的,只有通过该add方法,将线程用过的内存缓存起来才能重复使用。我们要养成这样一个习惯,一个ByteBuf用过后,需要调用realse方法将其释放,具体到池化的PooledByteBuf,调用其realse方法,并不会将内存直接返还给JVM堆,而是放入到内存池,供重复使用,由于引入了线程本地缓存,所以在调用PooledByteBuf的release方法时,并不会将它立马返回给内存池(PoolArena),而是放入到本地线程缓存中。
/** * Add to cache if not already full. */ public boolean add(PoolChunk<T> chunk, long handle) { Entry<T> entry = entries[tail]; if (entry.chunk != null) { // cache is full return false; } entriesInUse --; entry.chunk = chunk; entry.handle = handle; tail = nextIdx(tail); return true; }
本地线程池关于内存的分配与释放旧梳理到这里了。
2、PooledByteBuf线程本地缓存专题(线程对象池)
到目前为止,我们更加关注的是PooledByteBuf内部持有的内存的管理,重复利用,显然Netty并不满足与此,PooledByteBuf本身是否也可以缓存呢?是的,一样可以缓存,并且netty从PooledByteBuf对象本身,指向的内存从两个方面进行缓存,回收利用,并不是将单一某个面进行一起缓存。下文,将从PooledByteBuf对象的回收利用这一层面进行Netty本地线程池来进行PooledByteBuf的重复利用。重复声明一下,PooledByteBuf对象池中缓存的PooledByteBuf,并没有任何缓存区(byte[]或java.nio.ByteBuffer)关联,只是PooledByteBuf本身,从对象池中获取一个PooledByteBuf后,还需要调用initBuf等方法进行内存的分配。
请看如下代码片段:来自PooledHeapByteBuf:
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { @Override protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) { return new PooledHeapByteBuf(handle, 0); } };//@2 static PooledHeapByteBuf newInstance(int maxCapacity) { PooledHeapByteBuf buf = RECYCLER.get(); //@1 buf.setRefCnt(1); buf.maxCapacity(maxCapacity); return buf; }
关注代码@1,@2创建一个PooledHeapByteBuf,是从一个静态变量 RECYLER的get方法中获取,代码@2的写法是不是和ThreadLocal的使用非常类似,所以本专题的主角,就非Recycler莫属了。
2.1、Recycler构造方法核心属性
private static final int DEFAULT_MAX_CAPACITY; //对象池默认的最大容量 private static final int INITIAL_CAPACITY; //初始容量 private final int maxCapacity; //对象池的容量,由构造方法中进行初始化,默认为DEFAULT_MAX_CAPACITY。 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity); } };Recycler不是一普通的对象池,而是基于线程本地变量(缓存)实现的对象池,所以此处的threadLocal是Recycler中至关重要的数据结构。我们可以看出,Recycler为每个线程保持的是一叫Stack的对象。先跳过Statck,我们看一下Recycler对外提供了哪些方法供我们使用:
@SuppressWarnings("unchecked") public final T get() { Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; } public final boolean recycle(T o, Handle<T> handle) { DefaultHandle<T> h = (DefaultHandle<T>) handle; if (h.stack.parent != this) { return false; } h.recycle(o); return true; }
看到这里,为了摸清楚Recycler的内部实现原理,我们只能将目光先投向Stack类。但一看又发现Statck内部维护着这样一个数据结构:DefaultHandle<?>[] elements;也就是一个Statck类维护这样一个DefaultHandle数组,所以,我们先将目光锁定在DefaultHandle上:
2.2 DefaultHandle源码详解
DefaultHandle,是对象池中最基本的单元,由该对象包裹着实际缓存的对象。
public interface Handle<T> { //负责对象回收接口 void recycle(T object); } static final class DefaultHandle<T> implements Handle<T> { private int lastRecycledId; //@1 private int recycleId; //@2,这两个属性待分解 private Stack<?> stack; //该Handle所在的Statck对象,上面也谈到,Statck维护一个Handle数组 private Object value; //该对象就是对象池缓存的对象,这里用 private T value更合适。 DefaultHandle(Stack<?> stack) { // 构造函数 this.stack = stack; } @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } Thread thread = Thread.currentThread(); //@3 if (thread == stack.thread) { //@4 stack.push(this); return; } // we don't want to have a ref to the queue as the value in our weak map // so we null it out; to ensure there are no races with restoring it later // we impose a memory ordering here (no-op on x86) //@5 start Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(stack); if (queue == null) { delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread)); } queue.add(this); // @5 end } }
代码@1,@2,待下文分解
代码@3 获取当前释放的线程。
代码@4,如果释放当前的线程与Statck对象保持一致,直接将对象放入到该线程对象的Statck中即可。
代码@5,大意是说我们不希望当前调用recycle方法的线程与Handle对象中statck对象的线程竟然不一致,我们需要强制一个内存排序,这个我就有点懵逼了。先理解一下该代码的含义:
将Handle对象放入到收集线程的本地缓存中,存放的是一个 Map<Stack<?>, WeakOrderQueue>,然后将Handle加入到WeakOrderQueue中,WeakOrderQueue里面存放的对象是基于一个WeakReference,弱引用,在垃圾回收的时候会被清除掉,放入进对象池中的对象,在什么地方取出来呢?是在Statck的pop方法中吗?有待进一步跟踪学习,还有根据这个收集线程的本地变量存放的类型来看,是个Map,说明不只一个键值对,那这个收集线程是什么来头呢?以上两个问题,暂时缓一缓,先移步到Statck类,分析完后,才回过头来思考。
2.3 Statck 源码分析
看一段官方的介绍:
// we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
// than the stack owner recycles: when we run out of items in our stack we iterate this collection
// to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
// still recycling all items.
首先对于Statck目前,我只能理解放入elements中的对象,放入队列中对象,处了DefaultHandle中的 delayedRercycled有放入,但整个Recycler中未有相关使用语句,应该是Netty有额外的线程来辅助回收,这是个待定的问题?需要我慢慢去寻址。目前先按照常规流程讲解Statck,并抛出相关问题,希望大家予以帮助:
2.3.1 重要属性与构造函数解析
final Recycler<T> parent; //@1,Statck所在的对象池引用,回收器。 final Thread thread; // 该Statck对象关联的线程。 private DefaultHandle<?>[] elements; //存放具体对象的容器。 private final int maxCapacity; //允许存放的最大对象数,也就是elements数组的最大长度。 private int size; //当前elements中缓存对象的个数。 private volatile WeakOrderQueue head; //Statck的另外一个对象的存放容器,是一个链表,目前没有搞懂它在什么时候会初始化。 private WeakOrderQueue cursor, prev; Stack(Recycler<T> parent, Thread thread, int maxCapacity) { this.parent = parent; this.thread = thread; this.maxCapacity = maxCapacity; elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)]; }构造函数,就是初始化elements,maxCapacity、thread,parent等基本属性,引用链并未初始化。了解完数据结构相关的关联关系后,我们再次回到Recycler的入口方法,get与recycler方法:
@SuppressWarnings("unchecked") public final T get() { Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; } public final boolean recycle(T o, Handle<T> handle) { DefaultHandle<T> h = (DefaultHandle<T>) handle; if (h.stack.parent != this) { return false; } h.recycle(o); return true; }
上面两个方法,在了解其数据结构后,其实现思路应该很详细了,不做过多的讲解,值得留意的是Recycler是一个抽象类,需要有具体的子类在对象池中没有对象时,需要创建一个新的对象,具体创建对象的过程由其子类实现。
方法的签名:protected abstract T newObject(Handle<T> handle);
总结:
本文详细介绍了Netty线程本地内存的分配释放机制。同时提出Netty是将PooledByteBuf 与 PooledByteBuf执行的缓存区是单独进行管理的,PooledByteBuf指向的内存缓存区统一由Netty的内存分配,释放机制来管理,而与此同时,Netty实现了基于本地线程的对象池,用来重复利用PooledByteBuf本身这个对象,从Netty本地线程池获取的PooledByteBuf对象,不能直接使用,需要为它在申请内存进行初始化。本地线程池每一个线程关联一个Statck对象,该对象维护着两个仓库,一个是DefaultHandle[]
entries,用来存放对象池,还维护了另外一仓库,是用WeakOrderQueue 来维护的队列,由于整个Recycler类中,并没有对WeakOrderQueue 的 head属性进行初始化,这里的机制目前我没有想明白,只是猜测,Netty除了我们手工调用 Recycler.recycle方法外,应该有外部线程,比如定时任务之类的线程进行回收,目前未找到,由于目前对Netty的全貌并不理解,该部分的问题先留着,待后续深入后再研究,也希望志同道合的朋友提供帮助,再次十分感谢。
相关文章推荐
- Netty学习之旅------源码分析Netty内存池分配机制初探--PoolArena、PoolChunk、PoolSubpage等数据结构分析
- 深入研究Netty框架之ByteBuf功能原理及源码分析
- Netty学习之旅----源码分析内存分配与释放原理
- 【Netty4.X】Netty源码分析之ByteBuf(七)
- Netty源码分析:AbstractByteBuf
- 【图灵学院09】RPC底层通讯原理之Netty线程模型源码分析
- PooledDirectByteBuf源码分析
- netty源码分析之-ByteBuf详解(8)
- netty源码分析系列——ByteBuf&UnpooledByteBuf
- Netty学习之旅----ByteBuf源码解读之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf
- netty(十一)源码分析之ByteBuf 三
- Netty学习之旅------再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor
- netty(十)源码分析之ByteBuf
- netty(十一)源码分析之ByteBuf 二
- Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析
- netty源码分析(二十一)Netty数据容器ByteBuf底层数据结构深度剖析与ReferenceCounted初探
- Netty源码分析(五)—ByteBuf源码分析
- netty(十二)源码分析之ByteBuf 四
- 自顶向下深入分析Netty(九)--ByteBuf源码分析
- netty源码分析系列——PooledByteBuf&PooledByteBufAllocator