您的位置:首页 > 其它

高并发学习笔记(九)

2019-04-09 21:52 162 查看

一、ThreadLocal源码分析

1.什么是ThreadLocal

    ThreadLocal类是Java提供的一个线程私有的读写变量,可以理解为在Java的堆空间上专门划出一小块空间用于存放线程私有的数据或对象,线程之间是访问不到对方的ThreadLocal变量。下面看个用法示例:

/**
* ThreadLocal的用法示例
* Created by bzhang on 2019/3/21.
*/
public class TestThreadLocal {
private ThreadLocal<String> local = new ThreadLocal<>();    //直接new,即可创建
public String get(){
return local.get();     //获取ThreadLocal中的数据
}
public void put(String data){
local.set(data);  //往ThreadLocal中存放数据
}

public void remove(){
local.remove();   //删除ThreadLocal中的数据
}

public static void main(String[] args) {
TestThreadLocal test = new TestThreadLocal();
//在新建线程中存放数据
new Thread(new Runnable() {
@Override
public void run() {
test.put("gun");
System.out.println(Thread.currentThread().getName()+":"+test.get());
try {
TimeUnit.MILLISECONDS.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
test.remove();
System.out.println(Thread.currentThread().getName()+":"+test.get());
}
}).start();

try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
//在主线程中获取local中的数据
System.out.println(Thread.currentThread().getName()+":"+test.get());
}
}

//结果:
Thread-0:gun
main:null
Thread-0:null

    ThreadLocal的用法十分简单,就像一个容器一样,可以存放数据(set),返回数据(get),可以删除数据(remove),唯一不太一样的地方就是这个ThreadLocal与线程挂钩,在不同线程中得到的结果是不一样的。

    在分析源码之前,先看看ThreadLocal的的结构及引用关系,大致如下图:

    其中ThreadLocalMap是threadLocal的一个内部类,而Entry又是 ThreadLocalMap的一个内部类,Entry用于存储一个ThreadLocal对应的数据(同一线程下),从这里我们就可以看出ThreadLocalMap和hashmap十分类似,ThreadLocalMap也是一个Map容器,存放着以threadLocal为key的键值对(hashmap的key可以自定义,而ThreadLocalMap的key只能是ThreadLocal),并且ThreadLocalMap的底层数据结构是用数组实现的(hashmap则是用数组+链表)。

    下面通过源码来看看ThreadLocalMap的set是如何实现的:

//ThreadLocal的构造器,可以看出,啥也没做
public ThreadLocal() {
}

//往ThreadLocal中设置值
public void set(T value) {
Thread t = Thread.currentThread();    //获取当前线程的引用
ThreadLocalMap map = getMap(t);    //获取t的对应ThreadLocalMap
if (map != null)
//若是map已经存在,则直接新增键值对,后面再讲
map.set(this, value);
else
createMap(t, value);    //若原先没有map,则以t和value新建对应的Map容器
}

//返回t线程对应的threadLocals,初始threadLocals为null
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

//新建t线程对应的ThreadLocalMap
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

    这里使用到了一个ThreadLocal的内部类,createMap时新建了一个ThreadLocalMap对象。

//ThreadLocalMap的构造函数,创建了容量为16的Entry类型的table数组
//将线程要存放的数据以键值对的形式存放在table数组中,其中键为ThreadLocal对象本身,值为要存放的数据
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
//确定键值对在数组中的位置,通过散列确定在table中位置
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;    //数据个数+1
setThreshold(INITIAL_CAPACITY);    //设置数组扩容的临界值
}

//Java中将引用分为强,软,弱,虚,Entry继承了WeakReference类
//表示Entry对象都将是弱引用对象,而被弱引用关联的对象只能生存到下一次垃圾收集之前,
//即当垃圾收集器工作时,无论当前内存是否足够都会回收掉只被弱引用关联的对象
//Entry是个键值对存储对象,value用于存放值,k则是ThreadLocal本身
static class Entry extends WeakReference<ThreadLocal<?>> {

Object value;    //存放值

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

//table数组的初始大小
private static final int INITIAL_CAPACITY = 16;

//Entry数组
private Entry[] table;

//table中数据的个数
private int size = 0;

//table数组下一次扩容的临界值,默认为0
private int threshold; // Default to 0

//设置table数组需要扩容的临界值,当数组使用了threshold的容量,就开始扩容
private void setThreshold(int len) {
threshold = len * 2 / 3;
}

//用于生成ThreadLocal的hashcode
private final int threadLocalHashCode = nextHashCode();

//生成下一个hashcode的方法
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}

//下一个hashcode的自增量
private static final int HASH_INCREMENT = 0x61c88647;

//原子类型,用于生成下一个ThreadLocal的hashcode
private static AtomicInteger nextHashCode =
new AtomicInteger();

    了解了set的过程,在来看看get的过程:

public T get() {
Thread t = Thread.currentThread();    //获取当前线程
ThreadLocalMap map = getMap(t);    //获取线程对应的threadLocals
//判断map是否为null,即是否设置过threadLocals
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
//判断e是否为null,即table数组中是否存在ThreadLocalMap对应的entry
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;    //存在返回值
return result;
}
}
return setInitialValue();    //还未初始化ThreadLocalMap,执行setInitialValue方法
}

//从table数组中取出对应的Entry
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);    //计算对应的在table数组中的位置
Entry e = table[i];
//判断table数组中i是否存在数据,且是不是同一个ThreadLocal
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);    //未找到对应的Entry对象时调用该方法
}

//遍历table数组,查找与key对应的entry
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;

while (e != null) {
ThreadLocal<?> k = e.get();    //获取e中对应的ThreadLocal对象
if (k == key)    //key与e中的key对应时,说明找到了对应的entry,直接返回
return e;
if (k == null)    //当e的键为null,说明这个entry已经失效了,则需要清除
expungeStaleEntry(i);
else        //e的键不为null,但又不是key,则查询数组下个索引
i = nextIndex(i, len);
e = tab[i];
}
return null;    不存在对应的entry,返回null
}

//清除失效的entry中的数据,并更新table数组,且将table数组中无效的entry对应的索引位置赋为null
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;

tab[staleSlot].value = null;    //清除value
tab[staleSlot] = null;    //清除数组中的entry
size--;    //数量-1

// Rehash until we encounter null
Entry e;
int i;
//循环遍历table数组,清除已失效数据,更新未失效数据再数组中的位置
for (i = nextIndex(staleSlot, len);(e = tab[i]) != null;i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);    //计算未失效数据的心索引
if (h != i) {    //判断未失效数据的索引是否改变,改变就更新索引,未改变不处理
tab[i] = null;
while (tab[h] != null)    //新索引中有数据,就往后移动一位,知道找到索引中没有数据的位置
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}

//判断下一个数组索引是否越界,越界就返回数组的0索引
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}

//ThreadLocalMap尚未初始化就调用ThreadLocal中get方法,就触发调用该方法
//该方法初始化一个ThreadLocalMap,ThreadLocalMap中仅有一个以当前ThreadLocal为键,值为null的Entry数据
private T setInitialValue() {
T value = initialValue();    //获取初始默认值,默认为null
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);    //获取当前线程对应的ThreadLocalMap
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}

//默认get不到值,返回null,可重写该方法
protected T initialValue() {
return null;
}

    知道了get,再回看set方法中的map.set方法:

//线程已有对应的ThreadLocalMap,则更新其value值
private void set(ThreadLocal<?> key, Object value) {

Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);    //获取在table数组中的索引值

//当i位置的entry不为null时循环遍历table数组,
//即存在hash冲突,那么就要往后移动1位去在尝试插入,若还是冲突,继续后移,直到找到一个空位置
//若i位置的entry==null,表示该threadlocal可以直接往table数组中插入(没有hash冲突)
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();

//找到对应的entry,更新value即可
//这里表示要插入的key已经存在,直接更新value就行了
if (k == key) {
e.value = value;
return;
}
//查找到的entry中k为null,说明该Entry关联的ThreadLocal被回收(key是弱引用,很可能失效)
if (k == null) {
replaceStaleEntry(key, value, i);    //整理table数组
return;
}
}

//创建要插入table数组的新Entry
tab[i] = new Entry(key, value);
int sz = ++size;    //数量+1
//新增数据后,若数组中的数据个数达到扩容临界值,
//则要进行数组扩容,且所有数据重新进行hash散列计算索引位置
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

//整理table
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;

int slotToExpunge = staleSlot;
//查找table中的一个索引,该索引具有如下特点:
//该索引的前一个索引位置上没有entry(entry==null),且该索引对应的entry的key为null
//往前查找失效的Entry,找到的话就用slotToExpunge记录
for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;

//往后键为key的Entry
for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();    //获取entry的键值
if (k == key) {    //若与要找的key相同
e.value = value;    //更新value值

//交换staleSlot(key对应的原索引位置)和i(查找到key现在所在的索引位置),减少下次查找路劲长度
tab[i] = tab[staleSlot];
tab[staleSlot] = e;

//判断失效的entry对应的索引位置slotToExpunge和staleSlot是否相等,若相等就令staleSlot=i
//判断清理工作从哪个索引开始
if (slotToExpunge == staleSlot)
slotToExpunge = i;
//清理table数组中对应entry的key为null的索引
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
//若entry已失效,记录索引
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

//如果在前面的查找并整理table中没有找到 我们要设置数据的 ThreadLocal,那么就需要构造一个新的Entry
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

//获取前一个索引,0的前一个为数组的最后一个索引
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}

//清理Entry
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;    //是否移除的标志位
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);    //获取下一个索引
Entry e = tab[i];
//判断e是否为null,且e是否有键值
//当e不为null,且e的键为null,说明有要清除的entry
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);    //n减小一半
return removed;
}

    最后再来看看remove方法:

//remove方法比较简单,就是查找ThreadLocal在ThreadLocalMap的table数组中是否存在
//若是存在就将对应的entry的key置为null,然后清理table数组即可
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}

private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
//查找数组中是否有键为key的entry
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();    //将查找到的entry的key置为null
expungeStaleEntry(i);    //清理table数组
return;
}
}
}

public void clear() {
this.referent = null;
}

    OK,ThreadLocal的源码就分析到这,接下来讲一下ThreadLocal在高并发情形下的注意点:

    在使用ThreadLocal时,一定要在线程结束时执行remove方法回收资源,否则会有内存泄漏的风险。因为在多线程环境下,区分线程是否相同,只能通过判断线程的pid/cid。一个线程在结束后,若不回收ThreadLocal中的资源,操作系统在启动新的线程任务时可能会复用之前的线程(使用一些线程池时就是如此),导致该线程的ThreadLocal中的资源没有被回收,而出现内存泄漏。因此用完一定记得执行remove方法。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: