Netty中FastThreadLocal源码分析
Netty中使用FastThreadLocal替代JDK中的ThreadLocal【JAVA】ThreadLocal源码分析,其用法和ThreadLocal 一样,只不过从名字FastThreadLocal来看,其处理效率要比JDK中的ThreadLocal要高
在类加载的时候,先初始化了一个静态成员:
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
实际上FastThreadLocal的操作都是通过对InternalThreadLocalMap的操作来实现的,
而InternalThreadLocalMap是UnpaddedInternalThreadLocalMap的子类,UnpaddedInternalThreadLocalMap的定义比较简单:
class UnpaddedInternalThreadLocalMap { static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal(); static final AtomicInteger nextIndex = new AtomicInteger(); Object[] indexedVariables; int futureListenerStackDepth; int localChannelReaderStackDepth; Map<Class<?>, Boolean> handlerSharableCache; IntegerHolder counterHashCode; ThreadLocalRandom random; Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache; Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache; StringBuilder stringBuilder; Map<Charset, CharsetEncoder> charsetEncoderCache; Map<Charset, CharsetDecoder> charsetDecoderCache; ArrayList<Object> arrayList; UnpaddedInternalThreadLocalMap(Object[] indexedVariables) { this.indexedVariables = indexedVariables; } }
可以看到在类加载时,会初始化一个泛型为InternalThreadLocalMap的JDK的ThreadLocal对象作为其静态成员slowThreadLocalMap ,还有一个原子化的Integer静态成员nextIndex
InternalThreadLocalMap的定义如下:
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { private static final InternalLogger logger = InternalLoggerFactory.getInstance(InternalThreadLocalMap.class); private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8; private static final int STRING_BUILDER_INITIAL_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalMap.stringBuilder.initialSize", 1024); private static final int STRING_BUILDER_MAX_SIZE; public static final Object UNSET = new Object(); private BitSet cleanerFlags;
InternalThreadLocalMap的nextVariableIndex方法:
public static int nextVariableIndex() { int index = nextIndex.getAndIncrement(); if (index < 0) { nextIndex.decrementAndGet(); throw new IllegalStateException("too many thread-local indexed variables"); } else { return index; } }
这是一个CAS滞后自增操作,获取nextIndex自增前的值,那么variablesToRemoveIndex初始化时就是0,且恒为0,nextIndex此时变成了1
FastThreadLocal对象的初始化:
private final int index = InternalThreadLocalMap.nextVariableIndex(); public FastThreadLocal() { }
由上面可知,index成员恒等于nextVariableIndex的返回值,nextIndex 的CAS操作保障了每个FastThreadLocal对象的index是不同的
首先看到set方法:
public final void set(V value) { if (value != InternalThreadLocalMap.UNSET) { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); if (this.setKnownNotUnset(threadLocalMap, value)) { this.registerCleaner(threadLocalMap); } } else { this.remove(); } }
只要set的value不是InternalThreadLocalMap.UNSET,会先调用InternalThreadLocalMap的get方法:
public static InternalThreadLocalMap get() { Thread thread = Thread.currentThread(); return thread instanceof FastThreadLocalThread ? fastGet((FastThreadLocalThread)thread) : slowGet(); }
判断当前线程是否是FastThreadLocalThread,是则调用fastGet,否则调用slowGet
FastThreadLocalThread是经过包装后的Thread:
public class FastThreadLocalThread extends Thread { private final boolean cleanupFastThreadLocals; private InternalThreadLocalMap threadLocalMap; public FastThreadLocalThread() { this.cleanupFastThreadLocals = false; } public FastThreadLocalThread(Runnable target) { super(FastThreadLocalRunnable.wrap(target)); this.cleanupFastThreadLocals = true; } public FastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, FastThreadLocalRunnable.wrap(target)); this.cleanupFastThreadLocals = true; } public FastThreadLocalThread(String name) { super(name); this.cleanupFastThreadLocals = false; } public FastThreadLocalThread(ThreadGroup group, String name) { super(group, name); this.cleanupFastThreadLocals = false; } public FastThreadLocalThread(Runnable target, String name) { super(FastThreadLocalRunnable.wrap(target), name); this.cleanupFastThreadLocals = true; } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, FastThreadLocalRunnable.wrap(target), name); this.cleanupFastThreadLocals = true; } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, FastThreadLocalRunnable.wrap(target), name, stackSize); this.cleanupFastThreadLocals = true; } public final InternalThreadLocalMap threadLocalMap() { return this.threadLocalMap; } public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { this.threadLocalMap = threadLocalMap; } public boolean willCleanupFastThreadLocals() { return this.cleanupFastThreadLocals; } public static boolean willCleanupFastThreadLocals(Thread thread) { return thread instanceof FastThreadLocalThread && ((FastThreadLocalThread)thread).willCleanupFastThreadLocals(); } }
如果看过我之前写的ThreadLocal源码分析,看到这就明白,JDK的ThreadLocal中很重要的一点是在Thread类中有一个ThreadLocalMap类型的成员,每个线程都维护这一张ThreadLocalMap,通过ThreadLocalMap来和ThreadLocal对象产生映射关系;而这里和JDK同理绑定的就是InternalThreadLocalMap。
fastGet方法:
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); if (threadLocalMap == null) { thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); } return threadLocalMap; }
这里也和JDK的ThreadLocal类似,判断FastThreadLocalThread 线程的threadLocalMap成员是否为null,若是null,则先创建一个InternalThreadLocalMap实例:
private InternalThreadLocalMap() { super(newIndexedVariableTable()); }
先调用newIndexedVariableTable方法:
private static Object[] newIndexedVariableTable() { Object[] array = new Object[32]; Arrays.fill(array, UNSET); return array; }
创建了一个大小为32的数组,并且用UNSET这个Object填充了整个数组,然后调用UnpaddedInternalThreadLocalMap的构造,令indexedVariables成员保存该数组
再来看slowGet方法:
private static InternalThreadLocalMap slowGet() { ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; InternalThreadLocalMap ret = (InternalThreadLocalMap)slowThreadLocalMap.get(); if (ret == null) { ret = new InternalThreadLocalMap(); slowThreadLocalMap.set(ret); } return ret; }
可以看到,其实这里为了提高效率,并没有直接使用JDK的ThreadLocal,而是给当前非FastThreadLocalThread线程绑定了一个ThreadLocal<InternalThreadLocalMap>对象,避免直接使用JDK的ThreadLocal效率低。
回到FastThreadLocal的set方法,在取得到了当前线程的InternalThreadLocalMap成员后,调用setKnownNotUnset方法:
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) { if (threadLocalMap.setIndexedVariable(this.index, value)) { addToVariablesToRemove(threadLocalMap, this); return true; } else { return false; } }
首先调用了InternalThreadLocalMap的setIndexedVariable方法:
public boolean setIndexedVariable(int index, Object value) { Object[] lookup = this.indexedVariables; if (index < lookup.length) { Object oldValue = lookup[index]; lookup[index] = value; return oldValue == UNSET; } else { this.expandIndexedVariableTableAndSet(index, value); return true; } }
因为index是不可更改的常量,所以这里有两种情况:
当indexedVariables这个Object数组的长度大于index时,直接将value放在indexedVariables数组下标为index的位置,返回oldValue是否等于UNSET,若是不等于UNSET,说明已经set过了,直进行替换,若是等于UNSET,还要进行后续的registerCleaner
当indexedVariables这个Object数组的长度小于等于index时,调用expandIndexedVariableTableAndSet方法扩容
expandIndexedVariableTableAndSet方法:
private void expandIndexedVariableTableAndSet(int index, Object value) { Object[] oldArray = this.indexedVariables; int oldCapacity = oldArray.length; int newCapacity = index | index >>> 1; newCapacity |= newCapacity >>> 2; newCapacity |= newCapacity >>> 4; newCapacity |= newCapacity >>> 8; newCapacity |= newCapacity >>> 16; ++newCapacity; Object[] newArray = Arrays.copyOf(oldArray, newCapacity); Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); newArray[index] = value; this.indexedVariables = newArray; }
如果读过HashMap源码的话对上述的位运算操作因该不陌生,这个位运算产生的newCapacity的值是大于oldCapacity的最小的二的整数幂(【Java】HashMap中的tableSizeFor方法)
然后申请一个newCapacity大小的数组,将原数组的内容拷贝到新数组,并且用UNSET填充剩余部分,还是将value放在下标为index的位置,用indexedVariables保存新数组。
setIndexedVariable成立后,setKnownNotUnset继续调用addToVariablesToRemove方法:
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); Set variablesToRemove; if (v != InternalThreadLocalMap.UNSET && v != null) { variablesToRemove = (Set)v; } else { variablesToRemove = Collections.newSetFromMap(new IdentityHashMap()); threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); } variablesToRemove.add(variable); }
上面说过variablesToRemoveIndex恒为0,调用InternalThreadLocalMap的indexedVariable方法:
public Object indexedVariable(int index) { Object[] lookup = this.indexedVariables; return index < lookup.length ? lookup[index] : UNSET; }
由于variablesToRemoveIndex恒等于0,所以这里判断indexedVariables这个Object数组是否为空,若是为空,则返回第0个元素,若不是则返回UNSET
在addToVariablesToRemove中,接着对indexedVariables的返回值进行了判断,
判断不是UNSET,并且不等于null,则说明是set过的,然后将刚才的返回值强转为Set类型
若上述条件不成立,创建一个IdentityHashMap,将其包装成Set赋值给variablesToRemove,然后调用InternalThreadLocalMap的setIndexedVariable方法,这里就和上面不一样了,上面是将value放在下标为index的位置,而这里是将Set放在下标为0的位置。
看到这,再结合上面来看,其实已经有一个大致的想法了,一开始在set时,是将value放在InternalThreadLocalMap的Object数组下标为index的位置,然后在这里获取下标为0的Set,说明value是暂时放在下标为index的位置,然后判断下标为0的位置有没有Set,若是有,取出这个Set ,将当前FastThreadLocal对象放入Set中,则说明这个Set中存放的是FastThreadLocal集合
那么就有如下关系:
回到FastThreadLocal的set方法,在setKnownNotUnset成立后,调用registerCleaner方法:
private void registerCleaner(InternalThreadLocalMap threadLocalMap) { Thread current = Thread.currentThread(); if (!FastThreadLocalThread.willCleanupFastThreadLocals(current) && !threadLocalMap.isCleanerFlagSet(this.index)) { threadLocalMap.setCleanerFlag(this.index); } }
willCleanupFastThreadLocals的返回值在前面FastThreadLocalThread的初始化时就确定了,看到isCleanerFlagSet方法:
public boolean isCleanerFlagSet(int index) { return this.cleanerFlags != null && this.cleanerFlags.get(index); }
cleanerFlags 是一个BitSet对象,在InternalThreadLocalMap初始化时是null,
若不是第一次的set操作,则根据index,获取index在BitSet对应位的值
这里使用BitSet,使其持有的位和indexedVariables这个Object数组形成了一一对应关系,每一位都是0和1代表当前indexedVariables的对应下标位置的使用情况,0表示没有使用对应UNSET,1则代表有value
在上面条件成立的情况下,调用setCleanerFlag方法:
public void setCleanerFlag(int index) { if (this.cleanerFlags == null) { this.cleanerFlags = new BitSet(); } this.cleanerFlags.set(index); }
逻辑比较简单,判断cleanerFlags是否初始化,若没有,则立即初始化,再将cleanerFlags中对应index位的值设为1;
这里通过registerCleaner直接标记了所有set了value的下标可,为以后的removeAll 清除提高效率。
下来看FastThreadLocal的get方法:
public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); Object v = threadLocalMap.indexedVariable(this.index); if (v != InternalThreadLocalMap.UNSET) { return v; } else { V value = this.initialize(threadLocalMap); this.registerCleaner(threadLocalMap); return value; } }
和上面一样,先取得当前线程持有的InternalThreadLocalMap ,调用indexedVariable方法,根据当前FastThreadLocal的index定位,判断是否是UNSET(set过),若没有set过则和JDK一样调用initialize先set:
private V initialize(InternalThreadLocalMap threadLocalMap) { Object v = null; try { v = this.initialValue(); } catch (Exception var4) { PlatformDependent.throwException(var4); } threadLocalMap.setIndexedVariable(this.index, v); addToVariablesToRemove(threadLocalMap, this); return v; }
initialValue()方法就是对外提供的,需要手动覆盖:
protected V initialValue() throws Exception { return null; }
后面的操作就和set的逻辑一样。
remove方法:
public final void remove() { this.remove(InternalThreadLocalMap.getIfSet()); }
getIfSet方法:
public static InternalThreadLocalMap getIfSet() { Thread thread = Thread.currentThread(); return thread instanceof FastThreadLocalThread ? ((FastThreadLocalThread)thread).threadLocalMap() : (InternalThreadLocalMap)slowThreadLocalMap.get(); }
和上面的get方法思路相似,只不过在这里如果获取不到不会创建
然后调用remove重载:
public final void remove(InternalThreadLocalMap threadLocalMap) { if (threadLocalMap != null) { Object v = threadLocalMap.removeIndexedVariable(this.index); removeFromVariablesToRemove(threadLocalMap, this); if (v != InternalThreadLocalMap.UNSET) { try { this.onRemoval(v); } catch (Exception var4) { PlatformDependent.throwException(var4); } } } }
先检查threadLocalMap是否存在,若存在才进行后续操作:
调用removeIndexedVariable方法:
public Object removeIndexedVariable(int index) { Object[] lookup = this.indexedVariables; if (index < lookup.length) { Object v = lookup[index]; lookup[index] = UNSET; return v; } else { return UNSET; } }
和之前的setIndexedVariable逻辑相似,只不过现在是把index位置的元素设置为UNSET
接着调用removeFromVariablesToRemove方法:
private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); if (v != InternalThreadLocalMap.UNSET && v != null) { Set<FastThreadLocal<?>> variablesToRemove = (Set)v; variablesToRemove.remove(variable); } }
之前说过variablesToRemoveIndex恒为0,在Object数组中下标为0存储的Set<FastThreadLocal<?>>集合(不为UNSET情况下),从集合中,将当前FastThreadLocal移除掉
最后调用了onRemoval方法,该方法需要由用户去覆盖:
protected void onRemoval(V value) throws Exception { }
removeAll方法,是一个静态方法:
public static void removeAll() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); if (threadLocalMap != null) { try { Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); if (v != null && v != InternalThreadLocalMap.UNSET) { Set<FastThreadLocal<?>> variablesToRemove = (Set)v; FastThreadLocal<?>[] variablesToRemoveArray = (FastThreadLocal[])variablesToRemove.toArray(new FastThreadLocal[0]); FastThreadLocal[] var4 = variablesToRemoveArray; int var5 = variablesToRemoveArray.length; for(int var6 = 0; var6 < var5; ++var6) { FastThreadLocal<?> tlv = var4[var6]; tlv.remove(threadLocalMap); } } } finally { InternalThreadLocalMap.remove(); } } }
首先获取当前线程的InternalThreadLocalMap,若是存在继续后续操作:
通过indexedVariable方法,取出Object数组中下标为0的Set集合(如果不是UNSET情况下),将其转换为FastThreadLocal数组,遍历这个数组调用上面的remove方法。
FastThreadLocal源码分析到此结束。
- Netty 高性能之道 FastThreadLocal 源码分析(快且安全)
- ThreadLocal源码分析解密
- Netty 源码分析之 EventLoop(二) (最重要)
- netty源码分析之ChannelFuture
- Netty源码分析---EventLoopGroup和EventLoop
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- java 1.8 ThreadLocal源码分析
- ThreadLocal源码分析
- netty源码分析之-SimpleChannelInboundHandler与ChannelInboundHandlerAdapter详解(6)
- Netty 源码分析(二)
- rocketmq-remoting 源码分析NettyRemotingServer
- netty(十一)源码分析之ByteBuf 三
- netty源码分析的好文章
- netty源码分析系列——Bootstrap
- netty源码分析(十九)Netty项目开发过程中常见且重要事项分析
- Netty 源码分析-EventLoop
- Netty源码分析(二):服务端启动
- Netty源码分析(二)—客户端初始化
- 【JAVA】ThreadLocal源码分析