您的位置:首页 > 其它

ConcurrentHashMap原理分析

2017-03-06 19:52 573 查看
1 使用方法

private final Map<String, Long> concurrent= new ConcurrentHashMap<>();

public long increase(String word) {

    Long oldValue = wordCounts.get(word);

    Long newValue = (oldValue == null) ? 1L : oldValue + 1;

    concurrent.put(word, newValue);

    return newValue;

}

2 原理分析

0)预热基础

JAVA存储模型(JMM)的Happens-Before规则

public class Test1 {
private int a=1, b=2;
public void foo(){  // 线程1
a=3;
b=4;
}
public int getA(){ // 线程2
return a;
}
public int getB(){ // 线程2
return b;
}
}
上面的代码,当线程1执行foo方法的时候,线程2访问getA和getB会得到什么样的结果?

A:a=1, b=2  // 都未改变

B:a=3, b=4  // 都改变了

C:a=3, b=2  //  a改变了,b未改变

D:a=1, b=4  //  b改变了,a未改变

一些不了解JMM的同学可能会问怎么可能 b=4语句会先于 a=3 执行?

1. Java编译器的重排序(Reording)操作有可能导致执行顺序和代码顺序不一致


假设代码有两条语句,代码顺序是语句1先于语句2执行;那么只要语句2不依赖于语句1的结果,打乱它们的顺序对最终的结果没有影响的话,那么真正交给CPU去执行时,他们的顺序可以是没有限制的。可以允许语句2先于语句1被CPU执行,和代码中的顺序不一致。

因为我们例子中的两条赋值语句,并没有依赖关系,无论谁先谁后结果都是一样的,所以就可能有Reordering的情况,这种情况下,对于其他线程来说就可能造成了可见性顺序不一致的问题。

2.从线程工作内存写回主存时顺序无法保证

下图描述了JVM中主存和线程工作内存之间的交互,线程在修改一个变量时,先拷贝入线程工作内存中,在线程工作内存修改后再写回主存(Main Memery)中




假设例子中Reording后顺序仍与代码中的顺序一致,那么接下来呢?有意思的事情就发生在线程把Working Copy Memery中的变量写回Main Memery的时刻。线程1把变量写回Main Memery的过程对线程2的可见性顺序也是无法保证的。

上面的列子,a=3; b=4; 这两个语句在 Working Copy Memery中执行后,写回主存的过程对于线程2来说同样可能出现先b=4;后a=3;这样的相反顺序。

JMM中一个重要问题就是:如何让多线程之间,对象的状态对于各线程的“可视性”是顺序一致的。它的解决方式就是 Happens-before 规则:

要想保证执行动作B的线程看到动作A的结果(无论A和B是否发生在同一个线程中),A和B之间就必须满足happens-before关系

实现happens-before的方法:

1.锁



1.5之前Java中的锁只有最基本的synchronized,它是一种互斥的实现方式。在Java5之后,增加了一些其它锁,比如ReentrantLock,它基本作用和synchronized相似,但提供了更多的操作方式,比如在获取锁时不必像synchronized那样只是傻等,可以设置定时,轮询,或者中断,这些方法使得它在获取多个锁的情况可以避免死锁操作。

而我们需要了解的是ReentrantLock的性能相对synchronized来说有很大的提高。(不过Java6后对synchronized进行了优化,两者已经接近了。)在ConcurrentHashMap中,每个hash区间使用的锁正是ReentrantLock,1.8后ConcurrentHashMap使用的又是synchronized了

2.volatile

在Java5之前,JMM对Volatile的定义是:保证读写volatile都直接发生在main memory中,线程的working memory不进行缓存。它只承诺了读和写过程的可见性,并没有对Reording做限制,所以旧的Volatile并不太可靠。在Java5之后,JMM对volatile的语义进行了增强。就是我们看到的 volatile变量法则。

但是,volatile对于多线程,不是一种互斥(mutex)关系。用volatile修饰的变量,不能保证该变量状态的改变对于其他线程来说是一种“原子化操作,例如

private static volatile int nextSerialNum = 0;
public static int generateSerialNumber(){
return nextSerialNum++;
}


上面代码中对nextSerialNum使用了volatile来修饰,根据前面“Happens-Before”法则的第三条Volatile变量法则,看似不同线程都会得到一个新的serialNumber

问题出在了 nextSerialNum++ 这条语句上,它不是一个原子化的,实际上是read-modify-write三项操作,这就有可能使得在线程1在write之前,线程2也访问到了nextSerialNum,造成了线程1和线程2得到一样的serialNumber。

所以,在使用Volatile时,需要注意

a)  需不需要互斥;

b) 对象状态的改变是不是原子化的

不变模式(immutable)是多线程安全里最简单的一种保障方式。因为你拿他没有办法,想改变它也没有机会。
不变模式主要通过final关键字来限定的。在JMM中final关键字还有特殊的语义。Final域使得确保初始化安全性(initialization safety)成为可能,初始化安全性让不可变形对象不需要同步就能自由地被访问和共享

总结:happens-before法则,可见性volatile,原子性,不变模式

1)ConcurrentHashMap的锁分段技术

思想:细颗粒化锁机制,和JVM中CMS收集器,android安全机制selinux类似

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。



2)ConcurrentHashMap的结构






ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,
每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。



3)ConcurrentHashMap的初始化

ConcurrentHashMap初始化方法是通过initialCapacity,loadFactor, concurrencyLevel几个参数来初始化segments数组,段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}


initialCapacity表示新创建的这个ConcurrentHashMap的初始容量,也就是上面的结构图中的Entry数量。默认值为static final int DEFAULT_INITIAL_CAPACITY = 16

concurrencyLevel表示并发级别,这个值用来确定Segment的个数,比如,如果concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16

loadFactor表示负载因子,就是当ConcurrentHashMap中的元素个数大于loadFactor * 最大容量时就需要rehash,扩容。默认值为static final float DEFAULT_LOAD_FACTOR = 0.75f

初始化segments数组

初始化每个Segment。输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment

Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];


定位相应的segments

final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}

3)ConcurrentHashMap的get操作

Segment的get操作实现非常简单和高效。先经过一次再哈希,然后使用这个哈希值通过哈希运算定位到segment,再通过哈希算法定位到元素

public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}

它没有使用同步控制,交给segment去找,再看Segment中的get方法:

V get(Object key, int hash) {
if (count != 0) { // read-volatile // ①
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)  // ② 注意这里
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}


它也没有使用锁来同步,只是判断获取的entry的value是否为null,为null时才使用加锁的方式再次去获取,这个实现很微妙,没有锁同步的话,靠什么保证同步呢?我们一步步分析

第一步,先判断一下 count != 0;count变量表示segment中存在entry的个数。如果为0就不用找了。count变量的定义:

transient volatile int count;
Java5之后,JMM实现了对volatile的保证:对volatile域的写入操作happens-before于每一个后续对同一个域的读写操作。

所以,每次判断count变量的时候,即使恰好其他线程改变了segment也会体现出来。

第二步,获取到要该key所在segment中的索引地址,如果该地址有相同的hash对象,顺着链表一直比较下去找到该entry。当找到entry的时候,先做了一次比较: if(v != null) 这是为何呢?考虑一下,如果这个时候,另一个线程恰好新增/删除了entry,或者改变了entry的value,会如何?

先看一下HashEntry类结构

static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
。。。
}
除了 value,其它成员都是final修饰的,也就是说value可以被改变,其它都不可以改变,包括指向下一个HashEntry的next也不能被改变。

1. 在get代码的①和②之间,另一个线程新增了一个entry

下图大致描述了put 一个新的entry的过程



因为每个HashEntry中的next也是final的,没法对链表最后一个元素增加一个后续entry所以新增一个entry的实现方式只能通过头结点来插入了

newEntry对象是通过 new HashEntry(K k , V v, HashEntry next) 来创建的。如果另一个线程刚好new 这个对象时,当前线程来get它。因为没有同步,就可能会出现当前线程得到的newEntry对象是一个没有完全构造好的对象引用,可能出现v==null(get和put没有同步过程,所以put加锁后对get没有互斥效果)

所以才需要判断一下:if (v != null) 如果确实是一个不完整的对象,则使用锁的方式再次get一次

有没有可能会put进一个value为null的entry? 不会的,已经做了检查,这种情况会抛出异常,所以 ②处的判断完全是出于对多线程下访问一个new出来的对象的状态检测。

2.在get代码的①和②之间,另一个线程修改了一个entry的value

value是用volitale修饰的,可以保证读取时获取到的是修改后的值

3. 在get代码的①之后,另一个线程删除了一个entry


假设我们的链表元素是:e1-> e2 -> e3 -> e4 我们要删除 e3这个entry,因为HashEntry中next的不可变,所以我们无法直接把e2的next指向e4,而是将要删除的节点之前的节点复制一份,形成新的链表

它的实现大致如下图所示:



如果我们get的也恰巧是e3,可能我们顺着链表刚找到e1,这时另一个线程就执行了删除e3的操作,而我们线程还会继续沿着旧的链表找到e3返回。这里没有办法实时保证了。

我们第①处就判断了count变量,它保障了在 ①处能看到其他线程修改后的。①之后到②之间,如果再次发生了其他线程再删除了entry节点,就没法保证看到最新的了。

不过这也没什么关系,即使我们返回e3的时候,它被其他线程删除了,暴漏出去的e3也不会对我们新的链表造成影响,因为每个entry的成员除了value都是final修饰的,暴漏出去也不会对其他元素造成影响

这其实是一种乐观设计,设计者假设 ①之后到②之间 发生被其它线程增、删、改的操作可能性很小,所以不采用同步设计,而是采用了事后(其它线程这期间也来操作,并且可能发生非安全事件)弥补的方式。

而因为其他线程的“改”和“删”对我们的数据都不会造成影响,所以只有对“新增”操作进行了安全检查,就是②处的非null检查,如果确认不安全事件发生,则采用加锁的方式再次get

总结:get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空的才会加锁重读,我们知道HashTable容器的get方法是需要加锁的,那么ConcurrentHashMap的get操作是如何做到不加锁的呢?

原因是它的get方法里将要使用的共享变量都定义成volatile,如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是根据java内存模型的happen
before原则
对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景

4)ConcurrentHashMap的Put操作

public V put(Object key) {
hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, null);
}


ConcurrentHashMap的Put操作也是掉起了segment的put操作

再来回顾一下HashEntry

static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
......
}


因为每个HashEntry中的next也是final的,没法对链表最后一个元素增加一个后续entry所以新增一个entry的实现方式只能通过头结点来插入了。如果存在就直接替换这个结点的值。否则创建一个新的结点并添加到hash链的头部,这时一定要修改modCount和count的值,同样修改count的值一定要放在最后一步。



由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须得加锁。Put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置然后放在HashEntry数组里。

是否需要扩容。在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阀值,数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。

如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash后插入到新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容

5)ConcurrentHashMap的remove操作


public V remove(Object key) {
hash = hash(key.hashCode());
return segmentFor(hash).remove(key, hash, null);
}
整个操作是先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。下面是Segment的remove方法实现

整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。接下来,如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。

删除元素之前



删除元素3之后:



整个remove实现并不复杂,但是需要注意如下几点。

第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。

第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化

6)ConcurrentHashMap的size操作

如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加前使用的count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。

因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。

那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

参考:http://ifeve.com/concurrenthashmap/

参考:http://blog.csdn.net/seapeak007/article/details/53409618
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: