    在讲上篇Netty内存分配的时候,没有考虑本地线程的缓存,也就是Netty在分配内存时,首先尝试从线程本地缓存中去申请,如果申请失败,才从全局分配。本章就重点分析线程缓存相关的实现。首先我们将目光投向PooledByteBufAllocator的 final PoolThreadLocalCache threadCache;该类实现的机制类似ThreadLocal,我们重点看一下PooledThreadLocalCache的源码:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final AtomicInteger index = new AtomicInteger();

protected PoolThreadCache initialValue() {
final int idx = index.getAndIncrement();
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;

if (heapArenas != null) {
heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
} else {
heapArena = null;

if (directArenas != null) {
directArena = directArenas[Math.abs(idx % directArenas.length)];
} else {
directArena = null;

return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,

protected void onRemoval(PoolThreadCache value) {
1.1 PoolThreadCache 属性与构造函数分析
final PoolArena<byte[]> heapArena;         //使用轮叫轮询机制,每个线程从heapArena[]中获取一个,用于内存分配。
final PoolArena<ByteBuffer> directArena;          //同上

// Hold the caches for the different size classes, which are tiny, small and normal.     //针对不同大小,线程缓存的内存
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

// Used for bitshifting when calculate the index of normal caches later
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;

private int allocations;

private final Thread thread = Thread.currentThread();            //当前线程
private final Runnable freeTask = new Runnable() {               //线程消亡后,释放资源,下文会重点讲解。
public void run() {

// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

* @param  heapArena  线程使用的PoolArena.HeapArena
* @param  directArena 线程使用的PoolArena.DirectArena
* @param tinyCacheSize, tiny内存缓存的个数。默认为512
* @param  smallCacheSize small内存缓存的个数,默认为256个
* @param normalCacheSize normalCacheSize缓存的个数,默认为64
* @param maxCacheBufferCapacity
*         normalHeapCaches中单个缓存区域的最大大小,默认为32k  也就是normalHeapCaches[length-1]中缓存的最大内存空间
* @param freeSweepAllocationThreshold  在本地线程每分配freeSweepAllocationThreshold 次内存后,检测一下是否需要释放内存。
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
if (maxCachedBufferCapacity < 0) {
throw new IllegalArgumentException("maxCachedBufferCapacity: "
+ maxCachedBufferCapacity + " (expected: >= 0)");
if (freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ maxCachedBufferCapacity + " (expected: > 0)");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);
smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools);

numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);
smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools);

numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);     //@1
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;

// The thread-local cache will keep a list of pooled buffers which must be returned to
// the pool when the thread is not alive anymore.
ThreadDeathWatcher.watch(thread, freeTask);

代码@1,创建createNormalCaches 。
由于PoolThreadCache的设计理念与PoolArena一样,本身并不涉及到具体内存的存储,PoolThreadCache内部维护MemoryRegionCache[] tinySubpageHeapCaches,MemoryRegionCache[]
smallSubpageHeapCaches,其数组长度与PoolArena相同,MemoryRegionCaches[] normalHeapCaches,缓存的是noraml内存,Netty把大于pageSize小于chunkSize的空间成为normal内存。normalHeapCaches[1] 是normalHeapCaches[0] 的2倍, 先重点关注PoolThreadCache createNormalCaches 源码:
private static <T> NormalMemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);      //@1
int arraySize = Math.max(1, max / area.pageSize);                             //@2

NormalMemoryRegionCache<T>[] cache = new NormalMemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) {
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
return cache;
} else {
return null;

参数 numCaches,为SubPageMemoryRegionCache[]数组的长度,而cacheSize,为每一个SubPageMemoryRegionCache中缓存的内存个数,也就是SubPageMemoryRegionCache中entries[]的长度。这里的cacheSize,就是PooledByteBufAllocator
代码@1,其实应该不需要与area.chunkSize做比较,因为如果超过chunkSize的内存,netty不会重复使用,直接在整个堆空间或堆外空间申请并释放。这里可能是出于代码的自我保护,得到normalHeapCaches中单个 Entry所持有的内存不超过该值。
代码@2,计算normalHeapCaches数组的长度,这里有优化的空间,用位运算:int arraySize = Math.max(1,  max >> numShiftsNormalHeap   ),其中numShiftsNormalHeap为
log2(pageSize)。这样做的原因,也就是normalHeapCaches 数组中的元素的大小,是以2的幂倍pageSize递增的。cacheSize默认为64,参数值来源于PooledByteBufAllocator。接下来关注PoolThreadCache的allocateTiny方法:
1.2  PoolThreadCache allocateTiny方法
* Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
return cache(tinySubPageHeapCaches, idx);
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
if (area.isDirect()) {
int idx = log2(normCapacity >> numShiftsNormalDirect);
return cache(normalDirectCaches, idx);
int idx = log2(normCapacity >> numShiftsNormalHeap);      //@1
return cache(normalHeapCaches, idx);

private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
boolean allocated = cache.allocate(buf, reqCapacity);          //@2
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();                                                                                   //@3
return allocated;

代码@2,MeomoryRegionCache内部持有的 Entry entries[]数组是真正持有内存的单元,故现在将重点转移到MemoryRegionCache的讲解中。
代码@3,如果分配次数达到freeSweepAllocationThreshold,进行一次尝试释放一次。具体代码见 trim()方法的讲解。
1.2.2 关于PoolThreadCache allocateForTiny 之MemoryRegionCache 源码解读【针对1.2代码@2】
private final Entry<T>[] entries;            //MemoryRegionCache真正持有内存的地方
private static final class Entry<T> {
PoolChunk<T> chunk;      //具体的PoolChunk
long handle;                      //内存持有偏移量,高32位保存的是bitmaIdx,低32位保存的是memoryMapIdx
private final int maxUnusedCached;   //表示允许的最大的没有使用的内存数量(已经被缓存),默认为size的一半。
private int head;                                    // 作用类似于ByteBuf的readerIndex,从该位置获取一个缓存的Entiry。

private int tail;                                       // 作用类似于ByteBuf的writerIndex,从该位置增加一个加入一个新的Entity
private int maxEntriesInUse;                // 在使用中最大的entry数量
private int entriesInUse;                       // 目前使用中的entry数量
MemoryRegionCache(int size) {  // size 默认的大小为  512, 256, 64
entries = new Entry[powerOfTwo(size)];
for (int i = 0; i < entries.length; i++) {
entries[i] = new Entry<T>();
maxUnusedCached = size / 2;  //允许被缓存,但没有使用的最大数量,超过该值,则会触发内存释放操作。
maxUnusedCached :   256,128,32,为size的一半;head:0
;tail:0 ; maxEntriesInUse : 0; entriesInUse
: 0
* Allocate something out of the cache if possible and remove the entry from the cache.
public boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = entries[head];      //@1
if (entry.chunk == null) {                  //@2
return false;
entriesInUse ++;                               //@3
if (maxEntriesInUse < entriesInUse) {
maxEntriesInUse = entriesInUse;
initBuf(entry.chunk, entry.handle, buf, reqCapacity);     //@4
// only null out the chunk as we only use the chunk to check if the buffer is full or not.
entry.chunk = null;    //@5
head = nextIdx(head);    //@6
return true;

代码@6,head指针加一,如果超过entries的length,则重新从0开始,其实也就是  (head + 1) % (entires.length - 1),这里使用的是位运算。如果成功分配,则返回true, 结束本次内存的分配。

1.2.3 关于PoolThreadCache allocateForTiny 之代码@3,trim方法详解:
void trim() {
private static void trim(MemoryRegionCache<?>[] caches) {
if (caches == null) {
for (MemoryRegionCache<?> c: caches) {
private static void trim(MemoryRegionCache<?> cache) {
if (cache == null) {
* Free up cached {@link PoolChunk}s if not allocated frequently enough.
private void trim() {
int free = size() - maxEntriesInUse;        //@1
entriesInUse = 0;
maxEntriesInUse = 0;                             //@2

if (free <= maxUnusedCached) {           //@3

int i = head;
for (; free > 0; free--) {
if (!freeEntry(entries[i])) {
// all freed
i = nextIdx(i);

// Update head to point to te correct entry
// See https://github.com/netty/netty/issues/2924 head = i;

代码@1,size()方法返回的是  (tail-head) & (length-1),表示当前缓存了但未被使用的个数。maxEntriesInUse的值,其实就是entiryesInUse的值。
@SuppressWarnings({ "unchecked", "rawtypes" })
private static boolean freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
if (chunk == null) {
return false;
// need to synchronize on the area from which it was allocated before.
synchronized (chunk.arena) {
chunk.parent.free(chunk, entry.handle);
entry.chunk = null;
return true;
扫描一下MemoryRegionCache类,还有一个方法我们未曾分析过,就是add方法,默认一开始MemoryRegionCache类中的Entry[] entries中的PoolChunk与handle都是空的,只有通过该add方法,将线程用过的内存缓存起来才能重复使用。我们要养成这样一个习惯,一个ByteBuf用过后,需要调用realse方法将其释放,具体到池化的PooledByteBuf,调用其realse方法,并不会将内存直接返还给JVM堆,而是放入到内存池,供重复使用,由于引入了线程本地缓存,所以在调用PooledByteBuf的release方法时,并不会将它立马返回给内存池(PoolArena),而是放入到本地线程缓存中。
* Add to cache if not already full.
public boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = entries[tail];
if (entry.chunk != null) {
// cache is full
return false;
entriesInUse --;

entry.chunk = chunk;
entry.handle = handle;
tail = nextIdx(tail);
return true;


private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);

static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();      //@1
return buf;

关注代码@1,@2创建一个PooledHeapByteBuf,是从一个静态变量 RECYLER的get方法中获取,代码@2的写法是不是和ThreadLocal的使用非常类似,所以本专题的主角,就非Recycler莫属了。
private static final int DEFAULT_MAX_CAPACITY;       //对象池默认的最大容量
private static final int INITIAL_CAPACITY;                   //初始容量
private final int maxCapacity;   //对象池的容量,由构造方法中进行初始化,默认为DEFAULT_MAX_CAPACITY。
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity);
public final T get() {
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
return (T) handle.value;

public final boolean recycle(T o, Handle<T> handle) {
DefaultHandle<T> h = (DefaultHandle<T>) handle;
if (h.stack.parent != this) {
return false;

return true;

看到这里,为了摸清楚Recycler的内部实现原理,我们只能将目光先投向Stack类。但一看又发现Statck内部维护着这样一个数据结构:DefaultHandle<?>[] elements;也就是一个Statck类维护这样一个DefaultHandle数组,所以,我们先将目光锁定在DefaultHandle上:
2.2 DefaultHandle源码详解
public interface Handle<T> {    //负责对象回收接口
void recycle(T object);
static final class DefaultHandle<T> implements Handle<T> {
private int lastRecycledId;                     //@1
private int recycleId;                             //@2,这两个属性待分解

private Stack<?> stack;                        //该Handle所在的Statck对象,上面也谈到,Statck维护一个Handle数组
private Object value;                            //该对象就是对象池缓存的对象,这里用 private T value更合适。

DefaultHandle(Stack<?> stack) {         // 构造函数
this.stack = stack;

public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
Thread thread = Thread.currentThread();         //@3
if (thread == stack.thread) {                              //@4
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
//@5 start
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(stack);
if (queue == null) {
delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread));
queue.add(this);  // @5 end

代码@3 获取当前释放的线程。
将Handle对象放入到收集线程的本地缓存中,存放的是一个 Map<Stack<?>, WeakOrderQueue>,然后将Handle加入到WeakOrderQueue中,WeakOrderQueue里面存放的对象是基于一个WeakReference,弱引用,在垃圾回收的时候会被清除掉,放入进对象池中的对象,在什么地方取出来呢?是在Statck的pop方法中吗?有待进一步跟踪学习,还有根据这个收集线程的本地变量存放的类型来看,是个Map,说明不只一个键值对,那这个收集线程是什么来头呢?以上两个问题,暂时缓一缓,先移步到Statck类,分析完后,才回过头来思考。
2.3 Statck 源码分析

// we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
// than the stack owner recycles: when we run out of items in our stack we iterate this collection
// to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
// still recycling all items.

首先对于Statck目前,我只能理解放入elements中的对象,放入队列中对象,处了DefaultHandle中的 delayedRercycled有放入,但整个Recycler中未有相关使用语句,应该是Netty有额外的线程来辅助回收,这是个待定的问题?需要我慢慢去寻址。目前先按照常规流程讲解Statck,并抛出相关问题,希望大家予以帮助:
2.3.1 重要属性与构造函数解析
final Recycler<T> parent;       //@1,Statck所在的对象池引用,回收器。
final Thread thread;                // 该Statck对象关联的线程。
private DefaultHandle<?>[] elements;   //存放具体对象的容器。
private final int maxCapacity;                 //允许存放的最大对象数,也就是elements数组的最大长度。
private int size;                                        //当前elements中缓存对象的个数。

private volatile WeakOrderQueue head;
private WeakOrderQueue cursor, prev;

Stack(Recycler<T> parent, Thread thread, int maxCapacity) {
this.parent = parent;
this.thread = thread;
this.maxCapacity = maxCapacity;
elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)];
public final T get() {
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
return (T) handle.value;
public final boolean recycle(T o, Handle<T> handle) {
DefaultHandle<T> h = (DefaultHandle<T>) handle;
if (h.stack.parent != this) {
return false;

return true;


方法的签名:protected abstract T newObject(Handle<T> handle);

    本文详细介绍了Netty线程本地内存的分配释放机制。同时提出Netty是将PooledByteBuf 与 PooledByteBuf执行的缓存区是单独进行管理的,PooledByteBuf指向的内存缓存区统一由Netty的内存分配,释放机制来管理,而与此同时,Netty实现了基于本地线程的对象池,用来重复利用PooledByteBuf本身这个对象,从Netty本地线程池获取的PooledByteBuf对象,不能直接使用,需要为它在申请内存进行初始化。本地线程池每一个线程关联一个Statck对象,该对象维护着两个仓库,一个是DefaultHandle[]
entries,用来存放对象池,还维护了另外一仓库,是用WeakOrderQueue 来维护的队列,由于整个Recycler类中,并没有对WeakOrderQueue 的 head属性进行初始化,这里的机制目前我没有想明白,只是猜测,Netty除了我们手工调用 Recycler.recycle方法外,应该有外部线程,比如定时任务之类的线程进行回收,目前未找到,由于目前对Netty的全貌并不理解,该部分的问题先留着,待后续深入后再研究,也希望志同道合的朋友提供帮助,再次十分感谢。
