高并发学习笔记(九)
一、ThreadLocal源码分析
1.什么是ThreadLocal
ThreadLocal类是Java提供的一个线程私有的读写变量,可以理解为在Java的堆空间上专门划出一小块空间用于存放线程私有的数据或对象,线程之间是访问不到对方的ThreadLocal变量。下面看个用法示例:
/** * ThreadLocal的用法示例 * Created by bzhang on 2019/3/21. */ public class TestThreadLocal { private ThreadLocal<String> local = new ThreadLocal<>(); //直接new,即可创建 public String get(){ return local.get(); //获取ThreadLocal中的数据 } public void put(String data){ local.set(data); //往ThreadLocal中存放数据 } public void remove(){ local.remove(); //删除ThreadLocal中的数据 } public static void main(String[] args) { TestThreadLocal test = new TestThreadLocal(); //在新建线程中存放数据 new Thread(new Runnable() { @Override public void run() { test.put("gun"); System.out.println(Thread.currentThread().getName()+":"+test.get()); try { TimeUnit.MILLISECONDS.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } test.remove(); System.out.println(Thread.currentThread().getName()+":"+test.get()); } }).start(); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } //在主线程中获取local中的数据 System.out.println(Thread.currentThread().getName()+":"+test.get()); } } //结果: Thread-0:gun main:null Thread-0:null
ThreadLocal的用法十分简单,就像一个容器一样,可以存放数据(set),返回数据(get),可以删除数据(remove),唯一不太一样的地方就是这个ThreadLocal与线程挂钩,在不同线程中得到的结果是不一样的。
在分析源码之前,先看看ThreadLocal的的结构及引用关系,大致如下图:
其中ThreadLocalMap是threadLocal的一个内部类,而Entry又是 ThreadLocalMap的一个内部类,Entry用于存储一个ThreadLocal对应的数据(同一线程下),从这里我们就可以看出ThreadLocalMap和hashmap十分类似,ThreadLocalMap也是一个Map容器,存放着以threadLocal为key的键值对(hashmap的key可以自定义,而ThreadLocalMap的key只能是ThreadLocal),并且ThreadLocalMap的底层数据结构是用数组实现的(hashmap则是用数组+链表)。
下面通过源码来看看ThreadLocalMap的set是如何实现的:
//ThreadLocal的构造器,可以看出,啥也没做 public ThreadLocal() { } //往ThreadLocal中设置值 public void set(T value) { Thread t = Thread.currentThread(); //获取当前线程的引用 ThreadLocalMap map = getMap(t); //获取t的对应ThreadLocalMap if (map != null) //若是map已经存在,则直接新增键值对,后面再讲 map.set(this, value); else createMap(t, value); //若原先没有map,则以t和value新建对应的Map容器 } //返回t线程对应的threadLocals,初始threadLocals为null ThreadLocalMap getMap(Thread t) { return t.threadLocals; } //新建t线程对应的ThreadLocalMap void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
这里使用到了一个ThreadLocal的内部类,createMap时新建了一个ThreadLocalMap对象。
//ThreadLocalMap的构造函数,创建了容量为16的Entry类型的table数组 //将线程要存放的数据以键值对的形式存放在table数组中,其中键为ThreadLocal对象本身,值为要存放的数据 ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; //确定键值对在数组中的位置,通过散列确定在table中位置 int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; //数据个数+1 setThreshold(INITIAL_CAPACITY); //设置数组扩容的临界值 } //Java中将引用分为强,软,弱,虚,Entry继承了WeakReference类 //表示Entry对象都将是弱引用对象,而被弱引用关联的对象只能生存到下一次垃圾收集之前, //即当垃圾收集器工作时,无论当前内存是否足够都会回收掉只被弱引用关联的对象 //Entry是个键值对存储对象,value用于存放值,k则是ThreadLocal本身 static class Entry extends WeakReference<ThreadLocal<?>> { Object value; //存放值 Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } //table数组的初始大小 private static final int INITIAL_CAPACITY = 16; //Entry数组 private Entry[] table; //table中数据的个数 private int size = 0; //table数组下一次扩容的临界值,默认为0 private int threshold; // Default to 0 //设置table数组需要扩容的临界值,当数组使用了threshold的容量,就开始扩容 private void setThreshold(int len) { threshold = len * 2 / 3; } //用于生成ThreadLocal的hashcode private final int threadLocalHashCode = nextHashCode(); //生成下一个hashcode的方法 private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } //下一个hashcode的自增量 private static final int HASH_INCREMENT = 0x61c88647; //原子类型,用于生成下一个ThreadLocal的hashcode private static AtomicInteger nextHashCode = new AtomicInteger();
了解了set的过程,在来看看get的过程:
public T get() { Thread t = Thread.currentThread(); //获取当前线程 ThreadLocalMap map = getMap(t); //获取线程对应的threadLocals //判断map是否为null,即是否设置过threadLocals if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); //判断e是否为null,即table数组中是否存在ThreadLocalMap对应的entry if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; //存在返回值 return result; } } return setInitialValue(); //还未初始化ThreadLocalMap,执行setInitialValue方法 } //从table数组中取出对应的Entry private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); //计算对应的在table数组中的位置 Entry e = table[i]; //判断table数组中i是否存在数据,且是不是同一个ThreadLocal if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); //未找到对应的Entry对象时调用该方法 } //遍历table数组,查找与key对应的entry private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); //获取e中对应的ThreadLocal对象 if (k == key) //key与e中的key对应时,说明找到了对应的entry,直接返回 return e; if (k == null) //当e的键为null,说明这个entry已经失效了,则需要清除 expungeStaleEntry(i); else //e的键不为null,但又不是key,则查询数组下个索引 i = nextIndex(i, len); e = tab[i]; } return null; 不存在对应的entry,返回null } //清除失效的entry中的数据,并更新table数组,且将table数组中无效的entry对应的索引位置赋为null private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; tab[staleSlot].value = null; //清除value tab[staleSlot] = null; //清除数组中的entry size--; //数量-1 // Rehash until we encounter null Entry e; int i; //循环遍历table数组,清除已失效数据,更新未失效数据再数组中的位置 for (i = nextIndex(staleSlot, len);(e = tab[i]) != null;i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); //计算未失效数据的心索引 if (h != i) { //判断未失效数据的索引是否改变,改变就更新索引,未改变不处理 tab[i] = null; while (tab[h] != null) //新索引中有数据,就往后移动一位,知道找到索引中没有数据的位置 h = nextIndex(h, len); tab[h] = e; } } } return i; } //判断下一个数组索引是否越界,越界就返回数组的0索引 private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } //ThreadLocalMap尚未初始化就调用ThreadLocal中get方法,就触发调用该方法 //该方法初始化一个ThreadLocalMap,ThreadLocalMap中仅有一个以当前ThreadLocal为键,值为null的Entry数据 private T setInitialValue() { T value = initialValue(); //获取初始默认值,默认为null Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); //获取当前线程对应的ThreadLocalMap if (map != null) map.set(this, value); else createMap(t, value); return value; } //默认get不到值,返回null,可重写该方法 protected T initialValue() { return null; }
知道了get,再回看set方法中的map.set方法:
//线程已有对应的ThreadLocalMap,则更新其value值 private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); //获取在table数组中的索引值 //当i位置的entry不为null时循环遍历table数组, //即存在hash冲突,那么就要往后移动1位去在尝试插入,若还是冲突,继续后移,直到找到一个空位置 //若i位置的entry==null,表示该threadlocal可以直接往table数组中插入(没有hash冲突) for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); //找到对应的entry,更新value即可 //这里表示要插入的key已经存在,直接更新value就行了 if (k == key) { e.value = value; return; } //查找到的entry中k为null,说明该Entry关联的ThreadLocal被回收(key是弱引用,很可能失效) if (k == null) { replaceStaleEntry(key, value, i); //整理table数组 return; } } //创建要插入table数组的新Entry tab[i] = new Entry(key, value); int sz = ++size; //数量+1 //新增数据后,若数组中的数据个数达到扩容临界值, //则要进行数组扩容,且所有数据重新进行hash散列计算索引位置 if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } //整理table private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; int len = tab.length; Entry e; int slotToExpunge = staleSlot; //查找table中的一个索引,该索引具有如下特点: //该索引的前一个索引位置上没有entry(entry==null),且该索引对应的entry的key为null //往前查找失效的Entry,找到的话就用slotToExpunge记录 for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i; //往后键为key的Entry for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); //获取entry的键值 if (k == key) { //若与要找的key相同 e.value = value; //更新value值 //交换staleSlot(key对应的原索引位置)和i(查找到key现在所在的索引位置),减少下次查找路劲长度 tab[i] = tab[staleSlot]; tab[staleSlot] = e; //判断失效的entry对应的索引位置slotToExpunge和staleSlot是否相等,若相等就令staleSlot=i //判断清理工作从哪个索引开始 if (slotToExpunge == staleSlot) slotToExpunge = i; //清理table数组中对应entry的key为null的索引 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } //若entry已失效,记录索引 if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } //如果在前面的查找并整理table中没有找到 我们要设置数据的 ThreadLocal,那么就需要构造一个新的Entry tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); } //获取前一个索引,0的前一个为数组的最后一个索引 private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); } //清理Entry private boolean cleanSomeSlots(int i, int n) { boolean removed = false; //是否移除的标志位 Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); //获取下一个索引 Entry e = tab[i]; //判断e是否为null,且e是否有键值 //当e不为null,且e的键为null,说明有要清除的entry if (e != null && e.get() == null) { n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); //n减小一半 return removed; }
最后再来看看remove方法:
//remove方法比较简单,就是查找ThreadLocal在ThreadLocalMap的table数组中是否存在 //若是存在就将对应的entry的key置为null,然后清理table数组即可 public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); } private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); //查找数组中是否有键为key的entry for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); //将查找到的entry的key置为null expungeStaleEntry(i); //清理table数组 return; } } } public void clear() { this.referent = null; }
OK,ThreadLocal的源码就分析到这,接下来讲一下ThreadLocal在高并发情形下的注意点:
在使用ThreadLocal时,一定要在线程结束时执行remove方法回收资源,否则会有内存泄漏的风险。因为在多线程环境下,区分线程是否相同,只能通过判断线程的pid/cid。一个线程在结束后,若不回收ThreadLocal中的资源,操作系统在启动新的线程任务时可能会复用之前的线程(使用一些线程池时就是如此),导致该线程的ThreadLocal中的资源没有被回收,而出现内存泄漏。因此用完一定记得执行remove方法。
- Go语言并发与并行学习笔记(一)
- 学习笔记(九)并发(三)
- 并发编程艺术学习笔记-02-Java并发机制的底层实现原理
- Go语言并发与并行学习笔记(一)
- Java并发学习笔记(八)-LinkedBlockingQueue
- Go语言学习笔记-并发
- 并发编程学习笔记之Lock与synchronized
- Java并发学习笔记(15)信号量(Semaphore) 关卡((2)CyclicBarrier)
- 学习笔记:java并发编程学习之初识Concurrent
- JAVA并发编程学习笔记之ReentrantLock (r)
- MySQL学习笔记之四:并发控制和事务机制
- Java并发编程学习笔记 深入理解volatile关键字的作用
- 学习JAVA多线程编程 --- 《JAVA多线程编程核心技术》第2章 对象及变量的并发访问 笔记
- WCF学习笔记之并发与限流
- Java并发读书学习笔记(九)——性能与可伸缩性
- Java并发读书学习笔记(十)——显式锁
- Java学习笔记—多线程(并发工具类)
- Java并发读书学习笔记(十一)——原子变量与非阻塞同步机制
- 并发编程实战学习笔记(七)——避免活跃性问题
- 并发编程实战学习笔记(八)——性能与可伸缩性