您的位置:首页 > 其它

ThreadLocal源码浅析

2017-11-23 00:00 471 查看
先说说本文的不足,只写出了“做了什么”,本人目前水平还写不出“为什么这么做”,欢迎交流,如有错误欢迎指出。

ThreadLocal的作用就是在多线程环境下,给予各线程一个本地域来存放本地数据(对象),这些数据不能被其他线程访问。底层实现原理是,ThreadLocal并不提供任何集合来存放数据,而是每个Thread线程对象自己维护自己的本地数据,所以这些数据只能被本线程访问,其他线程无法访问,ThreadLocal只是一个中间人的角色。

先来看看Thread线程对象是如何维护自己的数据的。

java.lang.Thread

众所周知,通过Thread类的静态方法currentThread()会返回当前线程的Thread对象,而此对象有一个属性叫threadLocals,类型是ThreadLocalMap:

ThreadLocal.ThreadLocalMap threadLocals = null;

看命名跟Map有关,实际使用的确跟Map相似,存放(set)线程的本地数据(对象)时要提供K-V对,获取(get)数据时要通过K取得V。但是底层实现却不是Map,ThreadLocalMap有个属性table,类型是Entry数组,就是用来存放当前线程的本地数据的,如何存放(set)和获得(get)呢?则要通过ThreadLocal这个中间人,而K是谁V是谁,详细看后面内容。

java.lang.ThreadLocal

ThreadLocal的用法通常是这样的,在业务类中定义:

ThreadLocal<User> userContext = new ThreadLocal<User>();
ThreadLocal<Cart> cartContext = new ThreadLocal<Cart>();

在多线程环境下,某个线程对这些ThreadLocal对象的get/set操作,访问的数据也只是当前线程本身的数据。例如jack用户的数据set到ThreadLocal中,那么get的时候只会取得自己的数据,而不是tom用户的数据。

User user = new User();
userContext.set(user);
user = userContext.get();
Cart cart = new Cart();
cartContext.set(cart);
cart = cartContext.get();

一个ThreadLocal对象只能保存一个类型的数据,set/get方法源码如下

set(T value) /get()

public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

set方法流程简单明了:

取得当前线程维护的ThreadLocalMap

存在就往里面存放数据,ThreadLocal对象作为K,本地数据作为V

不存在就调用createMap方法创建一个ThreadLocalMap并存放数据

get方法也很简单,大概流程是:

取得当前线程维护的ThreadLocalMap

存在就根据当前ThreadLocal对象取得本地数据

不存在就调用setInitialValue方法初始化当前线程的ThreadLocalMap

源码就不贴出来了

setInitialValue()

它是private的,它首先调用了protected的initialValue方法对当前线程的ThreadLocalMap进行初始化,但它返回的是null,那么初始化后ThreadLocalMap保存的V也将是null。如果要保存的本地数据是一个集合,最好重写initialValue方法。

private ThreadLocal<List> myItems = new ThreadLocal<List>(){
@Override
protected List initialValue() {
return n
3ff0
ew ArrayList<String>();
}
};

那么初始化后ThreadLocalMap保存的V就是一个空的ArrayList。之后的流程也很简单,跟上面的set方法无异。

threadLocalHashCode属性

静态属性threadLocalHashCode,是一个原子类AtomicInteger。如果创建多个ThreadLocal对象,这个值会自增。例如上面例子userContext和cartContext的threadLocalHashCode是不一样的。这个值用于计算一个索引,指明当前线程要保存的本地数据(对象)在ThreadLocalMap的table数组中的位置。

private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}


java.lang.ThreadLocal.ThreadLocalMap

ThreadLocalMap是ThreadLocal的静态内部类,先说说它有哪些属性:

Entry[] table Entry数组用于保存当前线程所有本地数据(对象)

int INITIAL_CAPACITY Entry数组的默认length,即16

int size Entry对象的数量,也即表示当前线程本地数据(对象)的数量

int threshold Entry数组的阈值,是length的2/3

再声明一些名词,方便理解:

一个数组元素称为一个槽(slot),槽是空的(null)或是一个Entry对象

Entry对象弱引用的ThreadLocal称为key,跟Entry对象的value属性形成K-V对

如果Entry对象的key为null,意味着key已经被垃圾回收,Entry对象变得“陈旧(stale)”,随时要被删除(expunge)。相反,非陈旧的就是“活动的”。

静态内部类Entry类是弱引用的子类,弱引用一个ThreadLocal对象,value属性则是要保存的数据(对象)本身

static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

构造方法

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}

可以看到,Entry对象在数组中的位置i,是按照ThreadLocal的threadLocalHashCode进行计算的,ThreadLocal只是一个key的角色。

int i = key.threadLocalHashCode & (len-1);

但是要注意,Entry对象不一定都放在这个table[i]上,有可能临时在其他索引位置例如table[i+n]。因为Entry对象弱引用的ThreadLocal对象可能随时被垃圾回收,这样会导致Entry数组table的“碎片化”。ThreadLocalMap有一种机制,会对table进行整理,类似磁盘的碎片整理,这就导致新Entry对象不在正确的table[i]上,后面详说。

set(ThreadLocal<?> key, Object value)

流程如下:

首先根据key计算出正确的位置i,然后从table[i]开始遍历所有连续的非空槽。

如果找到该key的Entry对象,则更新该Entry对象。

如果找到陈旧的(stale)Entry对象,调用replaceStaleEntry方法。

如果都没有,就在下一个空槽中保存新的Entry对象

最后调用cleanSomeSlots方法尝试整理一下table

private void set(ThreadLocal<?> key, Object value) {

Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

//从table[i]开始遍历所有连续的非空槽
for (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {

ThreadLocal<?> k = e.get();

if (k == key) {//找到该key的Entry对象
e.value = value;
return;
}

if (k == null) {//找到陈旧的(stale)Entry对象
replaceStaleEntry(key, value, i);
return;
}
}

//如果都没有,就在下一个空槽中保存新的Entry对象
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

replaceStaleEntry方法和cleanSomeSlots方法干了什么,后面详说。

按照之前的ThreadLocal用法的代码,一个线程往两个ThreadLocal对象分别存放(set)User和Cart,那么各对象的示例图如下,ThreadLocal只是一个key角色:



nextIndex(int i, int len)/prevIndex(int i, int len)

如字面意思,分别是:

返回i的下一个槽的index

返回i的前一个槽的index

table是个数组,但在这里逻辑上是首尾相连的,如果i是0,即数组的头部,那么prevIndex方法返回的就是数组尾部的index。

expungeStaleEntry(int staleSlot)

在说cleanSomeSlots方法前,先说这个方法。按字面意思,删除(expunge)陈旧(stale)的槽(slot)。此方法针对的是一个从table[staleSlot]开始的连续非空的槽,删除里面所有陈旧的Entry对象。

首先,因为当前table[staleSlot]是陈旧的,所以先删除它,代码如下:

table[staleSlot].value = null;
table[staleSlot] = null;
size--;

然后对后面连续非空的槽进行遍历,进行如下操作:

是陈旧的Entry对象,删除

是活动的Entry对象,进行“整理”,Entry对象在正确的槽就不理会,否则就清空当前槽,Entry对象移去后面空槽中,直到Entry对象在正确的槽中。

Entry e;
int i;
//遍历后面连续非空的槽
for (i = nextIndex(staleSlot, len);(e = tab[i]) != null;i = nextIndex(i, len)) {

ThreadLocal<?> k = e.get();

//是陈旧的Entry对象,删除
if (k == null) {
e.value = null;
tab[i] = null;
size--;

} else {//是活动的Entry对象

int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;

用图例表示,蓝色代表非空的槽,有红X代表陈旧的Entry对象,假设调用expungStaleEntry(3),那么开始时遍历范围是table[3]~table[6]这些Entry对象。



删除table[3]后检查table[4],假设通过它的ThreadLocal计算的index不是4,也即目前位置不是正确的,那么则将它后移到table[7]



但处理完table[5]后,table数组如下



然而遍历还没完,你会发现,遍历的终点不再是table[6],而是table[8],因为是连续的。所以这个方法的遍历范围可能一直在变,努力将里面的Entry对象(数量可能变多)放到正确的位置为止。

最后所有Entry对象都在正确的槽,返回空槽的位置i

cleanSomeSlots(int i, int n)

如字面意思,删除一些陈旧的Entry对象,如果遇到连续的空槽比较多,它什么都不干。

private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;

do {
i = nextIndex(i, len);
Entry e = tab[i];

if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);

return removed;
}

这个do-while语句的作用是判断当前table[i]后面的槽是否有一定数量的连续空槽(该数量根据n计算do-while循环次数,也即空槽数,n有时是size有时是length),有则不理会返回false。如果有非空槽,就调用expungeStaleEntry方法删除一些陈旧的Entry对象并整理,从新设置循环判断次数(n是length了),直到没有了,最后返回true。

这个do-while循环设计比较巧妙,大概就是这个意思。

由于此方法从空槽开始,expungeStaleEntry方法又返回一个空槽的index,所以两个方法经常搭配出现。

rehash()

当cleanSomeSlots方法没有删除过一些Entry对象且size大于阈值时,才触发这个方法。

首先调用expungeStaleEntries()销毁table中所有陈旧的Entry,以释放资源。

如果size超过一定量(length的1/2),就调用resize()扩容为length的2倍,重新将各Entry放到新table里(索引公式中的length是新的值)

replaceStaleEntry(ThreadLocal<?> key, Object value,int staleSlot)

用于更新当前key的陈旧Entry对象,并对table进行整理。

第一个for,向前遍历连续的非空槽,找出最后找到的陈旧Entry对象的索引,赋值给slotToExpunge

int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);(e = tab[i]) != null;i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;

看图,例如调用的是replaceStaleEntry(key, value, 5),slotToExpunge就是3



第二个for,向后遍历连续的非空槽(如上图就是table[6]~table[8]),找到key的那个Entry对象就更新它并进行位置调换。例如是table[8],那么先更新table[8]的value,再跟table[5]互换



最后调用expungeStaleEntry方法(从slotToExpunge也即table[3]开始)和cleanSomeSlots方法进行整理,结束方法。

for (int i = nextIndex(staleSlot, len);(e = tab[i]) != null;i = nextIndex(i, len)) {

ThreadLocal<?> k = e.get();

//找到key的那个Entry对象
if (k == key) {
e.value = value;

tab[i] = tab[staleSlot];
tab[staleSlot] = e;

if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

//找不到key的那个Entry对象
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);

第二个for结束还是找不到key的那个Entry对象,则覆盖当前的槽,即table[5]从新设置key和value。



最后也是调用expungeStaleEntry方法(从slotToExpunge也即table[3]开始)和cleanSomeSlots方法进行整理,结束方法。

getEntry(ThreadLocal<?> key)

流程如下:

首先根据key计算出正确的位置i

如果table[i]存在且key对应得上(即要取得的Entry对象在正确位置上),就返回该Entry对象

如果table[i]存在但key对应不上(Entry对象不在正确位置上),那么调用getEntryAfterMiss方法继续找

getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e)

流程自然是向后面连续的非空槽中逐个找了,代码简单明了不细说。

END

比较复杂的是这三个方法:

expungeStaleEntry(int staleSlot)

cleanSomeSlots(int i, int n)

replaceStaleEntry(ThreadLocal<?> key, Object value,int staleSlot)

上面已经解答了这三个方法“做了什么”,但是还回答不了“为什么这么做”的问题,因为涉及到一些高效的算法,我在缺乏相关理论支持的情况下,难以理解,欢迎跟大家进一步交流。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: