您的位置:首页 > 其它

ConcurrentHashMap 源码分析

2022-04-01 19:30 218 查看

每当你想要努力一把的时候,都是未来的你在求救!!!

1. 概述

1.1
HashMap
HashTable

HashMap
线程不安全

因为多线程环境下,使用

HashMap
进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用
HashMap
,如以下代码:

public class TestHashMap {
public static void main(String[] args) throws InterruptedException {
final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}, "thread" + i).start();
}
}
}, "thread");
t.start();
t.join();
}
}

HashTable
效率低下

HashTable
容器使用
synchronized
来保证线程安全,但在线程竞争激烈的情况下
HashTable
的效率非常低下。因为当一个线程访问
HashTable
的同步方法时,其他线程访问
HashTable
的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。

1.2 锁分段技术

HashTable
容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问
HashTable
的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是
ConcurrentHashMap
所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

1.3
ConcurrentHashMap
的结构

ConcurrentHashMap
是由
Segment
数组结构和
HashEntry
数组结构组成。
Segment
是一种可重入锁
ReentrantLock
,在
ConcurrentHashMap
里扮演锁的角色,
HashEntry
则用于存储键值对数据。一个
ConcurrentHashMap
里包含一个
Segment
数组,
Segment
的结构和
HashMap
类似,是一种数组和链表结构, 一个
Segment
里包含一个
HashEntry
数组,每个
HashEntry
是一个链表结构的元素, 每个
Segment
守护者一个
HashEntry
数组里的元素,当对
HashEntry
数组的数据进行修改时,必须首先获得它对应的
Segment
锁。

2. 原理

建议看

ConcurrentHashMap
原理和源码之前,先去一下
HashMap
的原理和源码,这样会更轻松一点哦!

注意了,注意了,看了很久才发现,前面介绍

ConcurrentHashMap
的结构时存在一些问题:

  • JDK1.7
    中,
    ConcurrentHashMap
    的结构是由
    Segment
    数组结构和
    HashEntry
    数组结构组成的

  • JDK1.8
    中已经摒弃了
    Segment
    的概念,而是直接用
    Node数组+链表+红黑树
    的数据结构来实现,并发控制使用
    Synchronized
    CAS
    来操作,整个看起来就像是优化过且线程安全的
    HashMap
    ,虽然在
    JDK1.8
    中还能看到
    Segment
    的数据结构,但是已经简化了属性,只是为了兼容旧版本

    JDK1.8
    ConcurrentHashMap
    的结构和
    HashMap
    基本上一模一样,不知道的小伙伴去看看我之前写的
    HashMap
    源码解析吧!

我这里只看

JDK1.8
哦,至于和
JDK1.7
的区别,了解一下就可以吧~

3. 源码分析

3.1 构造方法分析

3.1.1 构造方法源码

HashMap
一样,构造方法只会初始化一些重要的变量,比如负载因子和容量。底层的数据结构(如桶数组)是延迟在
put
操作时进行初始化的。

// 构造方法 1
public ConcurrentHashMap() {
}

// 构造方法 2
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

// 构造方法 3
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

// 构造方法 4
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)   // Use at least as many bins
initialCapacity = concurrencyLevel;   // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

// 构造方法 5
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

构造方法 1

是我们用的最多的,它里面什么都没做,初始化的一些变量是在

initTable
方法中做的,后面会详细说。

构造方法 2

指定了初始化容量,这里调用的

tableSizeFor
HashMap
中的一模一样,就不说了哈

构造方法 3

指定了初始化容量和负载因子,最终调用的是 构造方法 4,传入了默认的

concurrencyLevel

构造方法 4

看起来是指定了初始容量、负载因子和并发级别。

这个构造方法在

JDK1.8
中好像是为了兼容老版本的,因为在
initTable
方法中已经写死了负载因子就是0.75,在
JDK1.8
中也没有
concurrencyLevel
的概念了。

构造方法 5

将另一个 Map 中的映射拷贝一份到自己的存储结构中来,这个方法不是很常用。

3.1.2 重要参数源码
// node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认初始值,必须是2的幂数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;

// 2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

// forwarding nodes的hash值
static final int MOVED     = -1; // hash for forwarding nodes
// 树根节点的hash值
static final int TREEBIN   = -2; // hash for roots of trees
// ReservationNode的hash值
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();

//存放node的数组
transient volatile Node<K,V>[] table;

/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
*当为负数时:-1代表正在初始化,-N代表有N-1个线程正在进行扩容
*当为0时:代表当时的table还没有被初始化
*当为正数时:表示初始化或者下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
3.1.3 数据结构

Node

Node 是

ConcurrentHashMap
存储结构的基本单元,用于存储数据

static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
// ...
}

TreeNode

TreeNode
继承与
Node
,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表的节点数大于8时会转换成红黑树的结构,他就是通过
TreeNode
作为存储结构代替Node来转换成黑红树。

static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;  // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;    // needed to unlink next upon deletion
boolean red;
// ...
}

3.2 插入(put 方法)

public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// ConcurrentHashMap的键和值都不能为null
if (key == null || value == null) throw new NullPointerException();
// 获取 key 的Hash值,这里spread方法后位移,再进行与操作。可以去看一下HashMap中怎么做的,结合理解
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 第1个分支:ConcurrentHashMap 是在第一次put时进行初始化操作的
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 第2个分支:通过 tabAt 方法查询当前数组下标位置是否有值,如果没有值,就使用CAS将元素放进去
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;                   // no lock when adding to empty bin
}
// 第3个分支:如果在进行扩容,则先进行扩容操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 第4个分支:就是把元素放入槽内
else {
V oldVal = null;
//如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
synchronized (f) {
// 这里是看table数组下标为 i 的位置是否和 f 相同(好好看,上面已经给f赋值了)
// 如果相同,说明这个下表位置已经有数据了,要么是链表要么是红黑树
// 而且注意看tabAt方法,它是调用的sun.misc.Unsafe直接从内存中拿出来的数据,这样可以保证数据的内存可见性
if (tabAt(tab, i) == f) {
// 上面给 fh 赋值的是 f 的hash值,这里不懂的话好好看一下上面fh、f、和当前节点的关系
// fh>=0,表示当前table下标为 i 的位置是个链表
if (fh >= 0) {
// binCount 用于记录链表长度,后面判断转化为红黑树时会用到
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//这里涉及到相同的key进行put就会覆盖原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 将新put的节点添加到链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果f是红黑树结构 ,就需要将节点 put 进红黑树了
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// TREEIFY_THRESHOLD 的默认值是 8,此时将链表转化为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 总元素个数累加1,统计size,并且检查是否需要扩容
addCount(1L, binCount);
return null;
}

上面

putVal
方法的 for 循环中,有四个分支:

  • 第1个分支,是整个数组的初始化
  • 第2个分支,是所在的槽为空,说明该元素是该槽的第一个元素,直接新建一个头节点,然后返回
  • 第3个分支,说明该槽正在进行扩容,帮助其扩容
  • 第4个分支,就是把元素放入槽内。槽内可能是一个链表,也可能是一棵红黑树,通过头节点的类型可以判断是哪一种。第4个分支是包裹在
    synchronized(f)
    里面的,f对应的数组下标位置的头节点,意味着每个数组元素有一把锁,并发度等于数组的长度

上面的

binCount
表示链表的元素个数,当这个数目超过
TREEIFY_THRESHOLD=8
时,把链表转换成红黑树,也就是
treeifyBin(tab,i)
方法。但在这个方法内部,不一定需要进行红黑树转换,可能只做扩容操作,后面讲扩容方法时就知道了。

第1个分支,是整个数组的初始化

上面可以看到,

ConcurrentHashMap
是在第一次put时进行初始化操作的,调用了
initTable()
方法,下面看一下这个方法:

private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 就是当 table 还是空的时候,就开始进行初始化
if ((sc = sizeCtl) < 0)
// 这里如果忘记了 sizeCtl 的意义,就回到前面去看一下吧
// 当sizeCtl为负数时:-1代表正在初始化,-N代表有N-1个线程正在进行扩容
// 所以此时只让一个线程初始化,其他线程来了就自旋等待
Thread.yield(); // lost initialization race; just spin
// 这里就使用CAS将SIZECTL赋为 -1,代表正在初始化,这样其他线程才会自旋等待
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 上面判断时,已经将sizeCtl的值赋给 sc了
// 所以当sc为正数时:表示初始化或者下一次进行扩容的大小
// 这里计算了初始化ConcurrentHashMap 的数组大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 初始化
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>
;
table = tab = nt;
// sizeCtl不是数组长度,因此初始化成功后,就不再等于数组长度
// 而是n-(n>>>2)=0.75n,表示下一次扩容的阈值:n-n/4
sc = n - (n >>> 2);
}
} finally {
// 最后再将计算好的 sc 赋给 sizeCtl,以便于扩容时使用
sizeCtl = sc;
}
break;
}
}
return tab;
}

第3个分支:如果在进行扩容,则先进行扩容操作

在第三个分支调用了

helpTransfer
方法,这个方法是帮助从旧的 table 的元素复制到新的 table 中 。

其实

helpTransfer
方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 新的table nextTba已经存在前提下才能帮助扩容
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 调用扩容方法
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

这里面调用了扩容方法

transfer
,后面说扩容的时候再具体看吧。

put
方法的原理大致就是这样了,后面再看一下扩容的方法,这个就比较复杂了!

3.3 扩容

回到上面的

put
方法中,最后需要转化成红黑树时调用了
treeifyBin(tab, i)
方法,咱们就从这个方法开始看起:

private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 数组长度小于阈值64,不做红黑树转换,直接扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 链表转换为红黑树
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
// 遍历链表,初始化红黑树
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//通过TreeBin对象对TreeNode转换成红黑树
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

在上面的代码中,

MIN_TREEIFY_CAPACITY=64
,意味着当数组的长度没有超过64的时候,数组的每个节点里都是链表,只会扩容,不会转换成红黑树。只有当数组长度大于或等于64时,才考虑把链表转换成红黑树。

上面调用了

tryPresize(n << 1)
方法,可以看到,扩容后的容量直接是当前容量的 2 倍(为什么是2倍,这好像在前面将
HashMap
的时候讲过)。下面就看一下
tryPresize
方法:

private final void tryPresize(int size) {
// 这里先计算扩容后的值,注意tableSizeFor和HashMap中的 tableSizeFor 一样
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 还记得吧:当sizeCtl为正数时,表示初始化或者下一次进行扩容的大小
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
// 这个分支就是初始化ConcurrentHashMap,和前面的 initTable 方法差不多
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>
;
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
// 如果当前数组长度已经达到最大(MAXIMUM_CAPACITY),或者将要扩容的容量小于等于sizeCtl
break;
else if (tab == table) {
// 扩容操作
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt; // 新的数组
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 扩容
transfer(tab, null);
}
}
}

下面就看一下扩容的核心方法

transfer

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 计算步长
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) {            // initiating  // 初始化新的HashMap
try {
@SuppressWarnings("unchecked")
// 扩容两倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {      // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 初始的transferIndex为旧HashMap的数组长度
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
/*
此处,i为遍历下标,bound为边界。
如果成功获取一个任务,则i=nextIndex-1, bound=nextIndex-stride;
如果获取不到,则i=0,bound=0
*/
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// advance表示在从i=transferIndex-1遍历到bound位置的过程中,是否一直继续
while (advance) {
int nextIndex, nextBound;
/*
以下是哪个分支中的advance都是false,表示如果三个分支都不执行,才可以一直while循环
目的在于当对transferIndex执行CAS操作不成功的时候,需要自旋,以期获取一个stride的迁移任务。
*/
if (--i >= bound || finishing)
// 对数组遍历,通过这里的--i进行。如果成功执行了--i,就不需要继续while循环了,因为advance只能进一步。
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
// transferIndex <= 0,整个HashMap完成
i = -1;
advance = false;
}
/*
对transferIndex执行CAS操作,即为当前线程分配1个stride。
CAS操作成功,线程成功获取到一个stride的迁移任务;
CAS操作不成功,线程没有抢到任务,会继续执行while循环,自旋。
*/
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// i越界,整个HashMap遍历完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// finishing表示整个HashMap扩容完成
if (finishing) {
nextTable = null;
// 将nextTab赋值给当前table
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// tab[i]迁移完毕,赋值一个ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// tab[i]的位置已经在迁移过程中
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 对tab[i]进行迁移操作,tab[i]可能是一个链表或者红黑树
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 链表
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
// 表示lastRun之后的所有元素,hash值都是一样的
// 记录下这个最后的位置
lastRun = p;
}
}
if (runBit == 0) {
// 链表迁移的优化做法
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 红黑树,迁移做法和链表类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

该方法非常复杂,下面一步步分析:

  1. 扩容的基本原理如下图,首先建一个新的

    HashMap
    ,其数组长度是旧数组长度的2倍,然后把旧的元素逐个迁移过来。所以,上面的方法参数有2个,第1个参数
    tab
    是扩容之前的
    HashMap
    ,第2个参数
    nextTab
    是扩容之后的
    HashMap
    。当
    nextTab=null
    的时候,方法最初会对
    nextTab
    进行初始化。

    这里有一个关键点要说明:该方法会被多个线程调用,所以每个线程只是扩容旧的

    HashMap
    部分,这就涉及如何划分任务的问题。

  2. 上图为多个线程并行扩容-任务划分示意图。旧数组的长度是N,每个线程扩容一段,一段的长度用变量stride(步长)来表示,

    transferIndex
    表示了整个数组扩容的进度。

    stride
    的计算公式如上面的代码所示,即:在单核模式下直接等于n,因为在单核模式下没有办法多个线程并行扩容,只需要1个线程来扩容整个数组;在多核模式下为
    (n>>>3)/NCPU
    ,并且保证步长的最小值是 16。显然,需要的线程个数约为
    n/stride

    transferIndex
    ConcurrentHashMap
    的一个成员变量,记录了扩容的进度。初始值为 n,从大到小扩容,每次减
    stride
    个位置,最终减至n<=0,表示整个扩容完成。因此,从
    [0,transferIndex-1]
    的位置表示还没有分配到线程扩容的部分,从
    [transfexIndex,n-1]
    的位置表示已经分配给某个线程进行扩容,当前正在扩容中,或者已经扩容成功。

    因为

    transferIndex
    会被多个线程并发修改,每次减
    stride
    ,所以需要通过
    CAS
    进行操作,如下面的代码所示。

  3. 在扩容未完成之前,有的数组下标对应的槽已经迁移到了新的

    HashMap
    里面,有的还在旧的
    HashMap
    里面。这个时候,所有调用
    get(k,v)
    的线程还是会访问旧
    HashMap
    ,怎么处理呢?

    下图为扩容过程中的转发示意图:当

    Node[0]
    已经迁移成功,而其他
    Node
    还在迁移过程中时,如果有线程要读取
    Node[0]
    的数据,就会访问失败。为此,新建一个
    ForwardingNode
    ,即转发节点,在这个节点里面记录的是新的
    ConcurrentHashMap
    的引用。这样,当线程访问到
    ForwardingNode
    之后,会去查询新的
    ConcurrentHashMap

  4. 因为数组的长度

    tab.length
    是2的整数次方,每次扩容又是2倍。而
    Hash
    函数是
    hashCode%tab.length
    ,等价于
    hashCode&(tab.length-1)
    。这意味着:处于第i个位置的元素,在新的Hash表的数组中一定处于第i个或者第i+n个位置,如下图所示。举个简单的例子:假设数组长度是8,扩容之后是16:

    若hashCode=5,5%8=0,扩容后,5%16=0,位置保持不变;

    若hashCode=24,24%8=0,扩容后,24%16=8,后移8个位置;

    若hashCode=25,25%8=1,扩容后,25%16=9,后移8个位置;

    若hashCode=39,39%8=7,扩容后,39%8=7,位置保持不变;

    ......

    正因为有这样的规律,所以如下有代码:

    也就是把

    tab[i]
    位置的链表或红黑树重新组装成两部分,一部分链接到
    nextTab[i]
    的位置,一部分链接到
    nextTab[i+n]
    的位置,如上图所示。然后把
    tab[i]
    的位置指向一个
    ForwardingNode
    节点。

    同时,当

    tab[i]
    后面是链表时,使用类似于
    JDK 7
    中在扩容时的优化方法,从
    lastRun
    往后的所有节点,不需依次拷贝,而是直接链接到新的链表头部。从
    lastRun
    往前的所有节点,需要依次拷贝。

    了解了核心的迁移函数

    transfer(tab,nextTab)
    ,再回头看
    tryPresize(int size)
    函数。这个函数的输入是整个Hash表的元素个数,在函数里面,根据需要对整个Hash表进行扩容。想要看明白这个函数,需要透彻地理解
    sizeCtl
    变量,下面这段注释摘自源码。

    sizeCtl=-1
    时,表示整个
    HashMap
    正在初始化;

    sizeCtl=某个其他负数
    时,表示多个线程在对
    HashMap
    做并发扩容;

    sizeCtl=cap
    时,
    tab=null
    ,表示未初始之前的初始容量(如上面的构造函数所示);

    扩容成功之后,

    sizeCtl
    存储的是下一次要扩容的阈值,即上面初始化代码中的
    n-(n>>>2)=0.75n

    所以,

    sizeCtl
    变量在Hash表处于不同状态时,表达不同的含义。明白了这个道理,再来看上面的
    tryPresize(int size)
    函数。

    tryPresize(int size)
    是根据期望的元素个数对整个Hash表进行扩容,核心是调用transfer函数。在第一次扩容的时候,
    sizeCtl
    会被设置成一个很大的负数
    U.compareAndSwapInt(this,SIZECTL,sc,(rs << RESIZE_STAMP_SHIFT)+2)
    ;之后每一个线程扩容的时候,
    sizeCtl
    就加 1,
    U.compareAndSwapInt(this,SIZECTL,sc,sc+1)
    ,待扩容完成之后,
    sizeCtl
    减1。

这里看看

put
方法和扩容方法就行吧,其他方法就不记录了

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: