您的位置:首页 > 职场人生

深入解析 ConcurrentHashMap 实现内幕,吊打面试官,没问题

2019-12-09 16:30 1001 查看

在开发中,我们经常使用 HashMap 容器来存储 K-V 键值对,但是在并发多线程的情况下,HashMap 容器又是不安全的,因为在 put 元素的时候,如果触发扩容操作,也就是 rehash ,就会将原数组的内容重新 hash 到新的扩容数组中,但是在扩容这个过程中,其他线程也在进行 put 操作,如果这两个元素 hash 值相同,可能出现同时在同一数组下用链表表示,造成闭环,导致在get时会出现死循环,所以HashMap是线程不安全的

那有没有安全的 Map 容器呢?有的,目前 JDK 中提供了三种安全的 Map 容器:

  • HashTable
  • Collections.SynchronizedMap(同步包装器提供的方法)
  • ConcurrentHashMap

先来看看前两种容器,它们都是利用非常粗粒度的同步方式,在高并发情况下,性能比较低下。Hashtable 是在 put、get、size 等各种方法加上“synchronized” 锁来保证安全,这就导致了所有并发操作都要竞争同一把锁,一个线程在进行同步操作时,其他线程只能等待,大大降低了并发操作的效率。

再来看看 Collections 提供的同步包装器 SynchronizedMap ,我们可以先来看看 SynchronizedMap 的源代码:

private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private static final long serialVersionUID = 1978198479659022715L;

private final Map<K,V> m;     // Backing Map
final Object      mutex;        // Object on which to synchronize

SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}

SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}

public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
public void putAll(Map<? extends K, ? extends V> map) {
synchronized (mutex) {m.putAll(map);}
}
public void clear() {
synchronized (mutex) {m.clear();}
}
......
}

从源码中,我们可以看出 SynchronizedMap 虽然方法没有加 synchronized 锁,但是利用了“this”作为互斥的 mutex,所以在严格意义上 SynchronizedMap 跟 HashTable 一样,并没有实际的改进。

第三个 ConcurrentHashMap 也是这篇文章的主角,它相对前两种安全的 Map 容器来说,在设计和思想上有较大的变化,也极大的提高了 Map 的并发效率。就 ConcurrentHashMap 容器本身的实现来说,版本之间就会产生较大的差异,典型的就是 JDK1.7 和 JDK1.8 这两个版本,可以说是发生了翻天覆地的变化,在本文中也会介绍这两个版本的 ConcurrentHashMap 实现,主要的重点放在 JDK 1.8 版本上,我个人觉得 JDK 1.7 已经成为了过去式,没必要深入研究。

ConcurrentHashMap 在 JDK 1.7 中的实现

在 JDK 1.7 版本及之前的版本中,ConcurrentHashMap 为了解决 HashTable 会锁住整个 hash 表的问题,提出了分段锁的解决方案,分段锁就是将一个大的 hash 表分解成若干份小的 hash 表,需要加锁时就针对小的 hash 表进行加锁,从而来提升 hash 表的性能。JDK1.7 中的 ConcurrentHashMap 引入了 Segment 对象,将整个 hash 表分解成一个一个的 Segment 对象,每个 Segment 对象呢可以看作是一个细粒度的 HashMap。

Segment 对象继承了 ReentrantLock 类,因为 Segment 对象它就变成了一把锁,这样就可以保证数据的安全。 在 Segment 对象中通过 HashEntry 数组来维护其内部的 hash 表。每个 HashEntry 就代表了 map 中的一个 K-V,如果发生 hash 冲突时,在该位置就会形成链表。

JDK1.7 中,ConcurrentHashMap 的整体结构可以描述为下图的样子:

我们对 ConcurrentHashMap 最关心的地方莫过于如何解决 HashMap put 时候扩容引起的不安全问题?一起来看看 JDK1.7 中 ConcurrentHashMap 是如何解决这个问题的,我们先从 put 方法开始:

public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 二次哈希,以保证数据的分散性,避免哈希冲突
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
// Unsafe 调用方式,直接获取相应的 Segment
if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

在 put 方法中,首先是通过二次哈希减小哈希冲突的可能行,根据 hash 值以 Unsafe 调用方式,直接获取相应的 Segment,最终将数据添加到容器中是由 segment对象的 put 方法来完成。Segment对象的 put 方法源代码如下:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 无论如何,确保获取锁 scanAndLockForPut会去查找是否有key相同Node
ConcurrentHashMap.HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
ConcurrentHashMap.HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
ConcurrentHashMap.HashEntry<K,V> first = entryAt(tab, index);
for (ConcurrentHashMap.HashEntry<K,V> e = first;;) {
// 更新已存在的key
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 判断是否需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

由于 Segment 对象本身就是一把锁,所以在新增数据的时候,相应的 Segment对象块是被锁住的,其他线程并不能操作这个 Segment 对象,这样就保证了数据的安全性,在扩容时也是这样的,在 JDK1.7 中的 ConcurrentHashMap扩容只是针对 Segment 对象中的 HashEntry 数组进行扩容,还是因为 Segment 对象是一把锁,所以在 rehash 的过程中,其他线程无法对 segment 的 hash 表做操作,这就解决了 HashMap 中 put 数据引起的闭环问题

关于 JDK1.7 中的 ConcurrentHashMap 就聊这么多,我们只需要直到在 JDK1.7 中 ConcurrentHashMap 采用分段锁的方式来解决 HashMap 不安全问题。

ConcurrentHashMap 在 JDK1.8 中的实现

在 JDK1.8 中 ConcurrentHashMap 又发生了翻天覆地的变化,从实现的代码量上就可以看出来,在 1.7 中不到 2000行代码,而在 1.8 中已经 6000多行代码了 。废话不多说,我们来看看有那些变化。

先从容器安全说起,在容器安全上,1.8 中的 ConcurrentHashMap 放弃了 JDK1.7 中的分段技术,而是采用了 CAS 机制 + synchronized 来保证并发安全性,但是在 ConcurrentHashMap 实现里保留了 Segment 定义,这仅仅是为了保证序列化时的兼容性而已,并没有任何结构上的用处。 这里插播个 CAS 机制的知识点:

CAS 机制

CAS 典型的应用莫过于 AtomicInteger 了,CAS 属于原子操作的一种,能够保证一次读写操作是原子的。CAS 通过将内存中的值与期望值进行比较,只有在两者相等时才会对内存中的值进行修改,CAS 是在保证性能的同时提供并发场景下的线程安全性。在 Java 中 CAS 实现位于 sun.misc.Unsafe 类中,该类中定义了大量的 native 方法,CAS 的实现有以下几个方法:

public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

我们只能看到定义,并不能看到具体的实现,具体的实现依赖于操作系统,我们就不去管这些了,简单了解方法里面的参数是啥意思就行了:

  • o :目标操作对象
  • offset :目标操作数内存偏移地址
  • expected :期望值
  • x :更新值

CAS 机制虽然无需加锁、安全且高效,但也存在一些缺点,概括如下:

  • 循环检查的时间可能较长,不过可以限制循环检查的次数
  • 只能对一个共享变量执行原子操作
  • 存在 ABA 问题(ABA 问题是指在 CAS 两次检查操作期间,目标变量的值由 A 变为 B,又变回 A,但是 CAS 看不到这中间的变换,对它来说目标变量的值并没有发生变化,一直是 A,所以 CAS 操作会继续更新目标变量的值。)

在存储结构上,JDK1.8 中 ConcurrentHashMap 放弃了 HashEntry 结构而是采用了跟 HashMap 结构非常相似,采用 Node 数组加链表(链表长度大于8时转成红黑树)的形式,Node 节点设计如下:

static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
...省略...
}

跟 HashMap 一样 Key 字段被 final 修饰,说明在生命周期内,key 是不可变的, val 字段被 volatile 修饰了,这就保证了 val 字段的可见性。

JDK1.8 中的 ConcurrentHashMap 结构如下图所示:

在这里我提一下 ConcurrentHashMap 默认构造函数,我觉得这个地方比较有意思,ConcurrentHashMap 的默认构造函数如下:

public ConcurrentHashMap() {
}

发现没这个构造函数啥事没干,为啥要这样设计?这样做的好处是实现了懒加载(lazy-load 形式),有效避免了初始化的开销,这也是 JDK1.7 中ConcurrentHashMap 被很多人抱怨的地方。

结构上的变化就聊上面的两点,跟上面一样,我们还是来看看我们关心的问题,如何解决 HashMap 扩容时不安全的问题,带着这个问题来阅读 ConcurrentHashMap 的源代码,关于 ConcurrentHashMap 的源代码,在本文中主要聊新增(putVal )和扩容(transfer )这两个方法,其他方法就不在一一介绍了。

putVal 方法

ConcurrentHashMap 新增元素并不是直接调用 putVal 方法,而是使用 put 方法

public V put(K key, V value) {
return putVal(key, value, false);
}

但是 put 方法调用了 putVal 方法,换一句话来说就是 putVal 是具体的新增方法,是 put 方法的具体实现,在 putVal 方法源码加上了注释,具体代码如下:

final V putVal(K key, V value, boolean onlyIfAbsent) {
// 如果 key 为空,直接返回
if (key == null || value == null) throw new NullPointerException();
// 两次 hash ,减少碰撞次数
int hash = spread(key.hashCode());
// 记录链表节点得个数
int binCount = 0;
// 无条件得循环遍历整个 node 数组,直到成功
for (ConcurrentHashMap.Node<K,V>[] tab = table;;) {
ConcurrentHashMap.Node<K,V> f; int n, i, fh;
// lazy-load 懒加载的方式,如果当前 tab 容器为空,则初始化 tab 容器
if (tab == null || (n = tab.length) == 0)
tab = initTable();

// 通过Unsafe.getObjectVolatile()的方式获取数组对应index上的元素,如果元素为空,则直接无所插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//// 利用CAS去进行无锁线程安全操作
if (casTabAt(tab, i, null,
new ConcurrentHashMap.Node<K,V>(hash, key, value, null)))
break;                   // no lock when adding to empty bin
}
// 如果 fh == -1 ,说明正在扩容,那么该线程也去帮扩容
else if ((fh = f.hash) == MOVED)
// 协作扩容操作
tab = helpTransfer(tab, f);
else {
// 如果上面都不满足,说明存在 hash 冲突,则使用 synchronized 加锁。锁住链表或者红黑树的头结点,来保证操作安全
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {

if (fh >= 0) {// 表示该节点是链表
binCount = 1;
// 遍历该节点上的链表
for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) {
K ek;
//这里涉及到相同的key进行put就会覆盖原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
ConcurrentHashMap.Node<K,V> pred = e;
if ((e = e.next) == null) {//插入链表尾部
pred.next = new ConcurrentHashMap.Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof ConcurrentHashMap.TreeBin) {// 该节点是红黑树节点
ConcurrentHashMap.Node<K,V> p;
binCount = 2;
if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 插入完之后,判断链表长度是否大于8,大于8就需要转换为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果存在相同的key ,返回原来的值
if (oldVal != null)
return oldVal;
break;
}
}
}
//统计 size,并且检测是否需要扩容
addCount(1L, binCount);
return null;
}

在源码中有比较详细的注释,如果你想了解详细的实现,可以逐行读源码,在这里我们来对 putVal 方法做一个总结,putVal 方法主要做了以下几件事:

  • 第一步、在 ConcurrentHashMap 中不允许 key val 字段为空,所以第一步先校验key value 值,key、val 两个字段都不能是 null 才继续往下走,否则直接返回一个 NullPointerException 错误,这点跟 HashMap 有区别,HashMap 是可以允许为空的。
  • 第二步、判断容器是否初始化,如果容器没有初始化,则调用 initTable 方法初始化,initTable 方法如下:
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 负数表示正在初始化或扩容,等待
if ((sc = sizeCtl) < 0)
// 自旋等待
Thread.yield(); // lost initialization race; just spin
// 执行 CAS 操作,期望将 sizeCtl 设置为 -1,-1 是正在初始化的标识
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// CAS 抢到了锁
try {
// 对 table 进行初始化,初始化长度为指定值,或者默认值 16
if ((tab = table) == null || tab.length == 0) {
// sc 在初始化的时候用户可能会自定义,如果没有自定义,则是默认的
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 创建数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>
;
table = tab = nt;
// 指定下次扩容的大小,相当于 0.75 × n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

Table 本质上就是一个 Node 数组,其初始化过程也就是对 Node 数组的初始化过程,方法中使用了 CAS 策略执行初始化操作。初始化流程为:

1、判断 sizeCtl 值是否小于 0,如果小于 0 则表示 ConcurrentHashMap 正在执行初始化操作,所以需要先等待一会,如果其它线程初始化失败还可以顶替上去
2、如果 sizeCtl 值大于等于 0,则基于 CAS 策略抢占标记 sizeCtl 为 -1,表示 ConcurrentHashMap 正在执行初始化,然后构造 table,并更新 sizeCtl 的值

  • 第三步、根据双哈希之后的 hash 值找到数组对应的下标位置,如果该位置未存放节点,也就是说不存在 hash 冲突,则使用 CAS 无锁的方式将数据添加到容器中,并且结束循环。
  • 第四步、如果并未满足第三步,则会判断容器是否正在被其他线程进行扩容操作,如果正在被其他线程扩容,则放弃添加操作,加入到扩容大军中(ConcurrentHashMap 扩容操作采用的是多线程的方式,后面我们会讲到),扩容时并未跳出死循环,这一点就保证了容器在扩容时并不会有其他线程进行数据添加操作,这也保证了容器的安全性
  • 第五步、如果 hash 冲突,则进行链表操作或者红黑树操作(如果链表树超过8,则修改链表为红黑树),在进行链表或者红黑树操作时,会使用 synchronized 锁把头节点被锁住了,保证了同时只有一个线程修改链表,防止出现链表成环
  • 第六步、进行 addCount(1L, binCount) 操作,该操作会更新 size 大小,判断是否需要扩容,addCount 方法源码如下:
// X传入的是1,check 传入的是 putVal 方法里的 binCount,没有hash冲突的话为0,冲突就会大于1
private final void addCount(long x, int check) {
ConcurrentHashMap.CounterCell[] as; long b, s;
// 统计ConcurrentHashMap里面节点个数
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
ConcurrentHashMap.CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// check就是binCount,binCount 最小都为0,所以这个条件一定会为true
if (check >= 0) {
ConcurrentHashMap.Node<K,V>[] tab, nt; int n, sc;
// 这儿是自旋,需同时满足下面的条件
// 1. 第一个条件是map.size 大于 sizeCtl,也就是说需要扩容
// 2. 第二个条件是`table`不为null
// 3. 第三个条件是`table`的长度不能超过最大容量
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// 该判断表示已经有线程在进行扩容操作了
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 如果不在扩容,将 sc 更新:标识符左移 16 位 然后 + 2. 也就是变成一个负数。高 16 位是标识符,低 16 位初始是 2
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}

addCount 方法做了两个工作:
1、对 map 的 size 加一
2、检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。

扩容 transfer 方法

扩容 transfer 方法是一个非常牛逼的方法,在看具体的 transfer 源码之前,我们先来了解一下什么时候会触发扩容操作,不出意外的话,以下两种情况下可能触发扩容操作

  • 调用 put 方法新增元素之后,会调用 addCount 方法来更新 size 大小,并检查是否需要进行扩容,当数组元素个数达到阈值时,会触发transfer方法

  • 触发了 tryPresize 操作, tryPresize 操作会触发扩容操作,有两种情况会触发 tryPresize 操作: 第一种情况:当某节点的链表元素个数达到阈值 8 时,这时候需要将链表转成红黑树,在结构转换之前会,会先判断数组长度 n 是否小于阈值MIN_TREEIFY_CAPACITY,默认是64,如果小于则会调用tryPresize方法把数组长度扩大到原来的两倍,并触发transfer方法,重新调整节点的位置。
  • 第二种情况:在 putAll 操作时会先触发 tryPresize 操作。

tryPresize 方法源码如下:

好了,知道什么时候会触发扩容后,我们来看看 扩容 transfer 方法的源码,这也是一块硬骨头,非常难啃,希望我可以尽量的把它讲清楚,transfer 方法源码如下:

private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 多线程扩容,每核处理的量小于16,则强制赋值16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// nextTab 为空,先实例化一个新的数组
if (nextTab == null) {            // initiating
try {
@SuppressWarnings("unchecked")
// 新数组的大小是原来的两倍
ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {      // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 更新成员变量
nextTable = nextTab;
// 更新转移下标,就是 老的 tab 的 length
transferIndex = n;
}
// bound :该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容
// advance: 该参数
int nextn = nextTab.length;
// 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab);
// advance 变量指的是是否继续递减转移下一个桶,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进
boolean advance = true;
// 完成状态,如果是 true,表示扩容结束
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
for (int i = 0, bound = 0;;) {
ConcurrentHashMap.Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
// 这儿多判断一次,是否为了防止可能出现的remove()操作
if (tabAt(tab, i) == f) {
// 旧链表上该节点的数据,会被分成低位和高位,低位就是在新链表上的位置跟旧链表上一样,
// 高位就是在新链表的位置是旧链表位置加上旧链表的长度
ConcurrentHashMap.Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
ConcurrentHashMap.Node<K,V> lastRun = f;
for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 该节点哈希值与旧链表长度与运算,结果为0,则在低位节点上,反之,在高位节点上
if ((ph & n) == 0)
ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln);
else
hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
// 在nextTable i + n 位置处插上链表
setTabAt(nextTab, i + n, hn);
// 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof ConcurrentHashMap.TreeBin) {
// 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
// 红黑树的逻辑跟节点一模一样,最后也会分高位和低位
ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null;
ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

想知道具体的实现细节,请逐行读源码,如果遇到不懂得,欢迎留言交流,跟 putVal 方法一样,我们同样来对 transfer 方法进行总结,transfer 大致做了以下几件事件:

  • 第一步:计算出每个线程每次可以处理的个数,根据 Map 的长度,计算出每个线程(CPU)需要处理的桶(table数组的个数),默认每个线程每次处理 16 个桶,如果小于 16 个,则强制变成 16 个桶。
  • 第二步:对 nextTab 初始化,如果传入的新 table nextTab 为空,则对 nextTab 初始化,默认是原 table 的两倍
  • 第三步:引入 ForwardingNode、advance、finishing 变量来辅助扩容,ForwardingNode 表示该节点已经处理过,不需要在处理,advance 表示该线程是否可以下移到下一个桶(true:表示可以下移),finishing 表示是否结束扩容(true:结束扩容,false:未结束扩容) ,具体的逻辑就不说了
  • 第四步:跳过一些其他细节,直接到数据迁移这一块,在数据转移的过程中会加 synchronized 锁,锁住头节点,同步化操作,防止 putVal 的时候向链表插入数据
  • 第五步:进行数据迁移,如果这个桶上的节点是链表或者红黑树,则会将节点数据分为低位和高位,计算的规则是通过该节点的 hash 值跟为扩容之前的 table 容器长度进行位运算(&),如果结果为 0 ,则将数据放在新表的低位(当前 table 中为 第 i 个位置,在新表中还是第 i 个位置),结果不为 0 ,则放在新表的高位(当前 table 中为第 i 个位置,在新表中的位置为 i + 当前 table 容器的长度)

  • 第六步:如果桶挂载的是红黑树,不仅需要分离出低位节点和高位节点,还需要判断低位和高位节点在新表以链表还是红黑树的形式存放。

还是那句话扩容 transfer 方法很难,希望我把它讲清楚了,关于其他 ConcurrentHashMap 的方法在这里就不聊了,ConcurrentHashMap 博大精深,要把它研究透还需要时日,好了,这就是我对 ConcurrentHashMap 实现内幕的分享,希望本文的内容对大家的学习或者工作能带来一定的帮助,感谢大家的支持。

最后

目前互联网上很多大佬都有 ConcurrentHashMap 相关文章,如有雷同,请多多包涵了。原创不易,码字不易,还希望大家多多支持。若文中有所错误之处,还望提出,谢谢。

欢迎扫码关注微信公众号:「平头哥的技术博文」,和平头哥一起学习,一起进步。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐