ConcurrentHashMap的JDK1.7实现
2017-08-18 22:41
218 查看
本篇文章主要介绍一下JDK1.7中的ConcurrentHashMap的一些代码结构。ConcurrentHashMap顾名思义就是线程安全的HashMap,相对于HashMap来说,可以保证线程安全的问题;对于HashTable来说可以提高存取的效率。
基本结构
从上述类图可以看出,ConcurrentHashMap除了Map通用的get()、put()等方法外,还有一个重要的属性Segment数组。每个Segment又是一个ReentrantLock,其内部又有一个HashEntry数组,HashEntry是最终key-value的存储单元。可以看出,HashEntry是一个链表结构。从而可以大致的得到如下结构图:
初始化
ConcurrentHashMap构造方法的3个入参分别是:initialCapacity、loadFactor、concurrencyLevel,用于初始化segment数组、segmentShift、segmentMask等数据。构造方法如下:
initialCapacity:ConcurrentHashMap的初始化容量,默认值是16(DEFAULT_INITIAL_CAPACITY);
loadFactor:负载因子,默认值是0.75f,当ConcurrentHashMap中的HashEntry数量到达 loadFactor * 当前容量 时,就会进行rehash()进行扩容;
concurrencyLevel:并发的级别,也就是Segment数组的大小(ConcurrentHashMap是对每个Segment加锁,因此得名并发级别),默认值是16(DEFAULT_CONCURRENCY_LEVEL)。这里需要注意的是concurrencyLevel的大小都是2^n,concurrencyLevel会取大于传入值的邻近的一个值。如:传入15,则concurrencyLevel=16(2^4)。
下面介绍一下构造函数中的初始化流程:
参数校验。
设置ssize的值,ssize是segments数组的大小,这里取的是大于concurrencyLevel的2^n的一个值。
设置segmentShift和segmentMask的值,sshift就是上面描述中的n值,默认情况下concurrencyLevel等于16,sshift就等于4。因此默认情况下segmentShift的值就是28,这个值会参与hash运算。segmentMask是hash运算的掩码,默认情况下等于16-1=15,类似于网络中的子网掩码,segmentMask的二进制最后几位都是1,最大值是末尾16个1(65535)。
初始化segment,其中cap是segment里面的HashEntry数组的长度。它取的是大于等于c(Map容量/ssize)的2^N的一个值。
创建segments和segments[0](这里面只初始化了一个segments数组中的第0个元素)。
定位Segment
ConcurrentHashMap使用分段锁Segment来保护不同的数据,下面简单的描述一下定位的流程。
第一步是获取key的hash值,代码如下:
常用方法
get()
get操作是先定位到segment,然后再到segment中去获取对应的value值。代码如下:
首先,根据Segment的索引((h >>> segmentShift) & segmentMask)算出在Segment[]上的偏移量。然后根据偏移量,调用UNSAFE.getObjectVolatile()判断其是否存在。
若Segment存在,则继续查找table[]的索引位置;根据table的索引((tab.length - 1) & h)算出在table[]上的偏移量,循环链表找出结果。
从上面步骤可以看出,整个过程中读取并未加锁。所以在读取的过程中,如果出现HashEntry的变更,则无法得到真正的结果。这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,就必须要加锁控制。
put()
对于put()操作,前面的定位Segment的操作都是和put()相同的。找到Segment以后,然后对整个Segment加锁,然后再进行后续的操作。下面详细介绍一下:
下面看一下具体的put方法,由Segment实现:
首先,segment尝试tryLock(),多次失败以后使用lock(),同时会查找HashEntry,如果没有找到,创建并返回一个(预热操作)。
然后,循环定位链表中的HashEntry位置。如果查找到key,若找到直接修改value值,并退出。如果找不到则在链表头部新增一个HashEntry节点。
在新增节点的之后,会检查是否会达到threshold,如果到达则进行rehash()扩容。
关于rehash()操作,后面单独介绍。下面介绍一下scanAndLockForPut()的实现。
这里首先尝试使用tryLock(),达到最大重试次数MAX_SCAN_RETRIES后,转为lock()的阻塞操作;
定位node位置的时候,如果找不到则创建一个HashEntry;
如果加锁过程中,node有新增,则重新遍历链表,(这里可以解释对于链表的插入位置总是head的问题了)。
rehash操作
rehash也就是扩容操作,扩容之后的容量是之前的两倍,所以扩容之后的newCapacity也是2^n的一个值。
首先计算出newCapacity的容量;
然后循环table[],重新分配每条链表上面的元素。因为使用的是 *2的方式扩容,每个元素在table中的索引要么为i(不变),要么是i+oldCapacity。如:扩容前容量是16,当前HashEntry在table[]中的索引为3,则新的索引可能为3或者19。
拷贝过程中,如果为单链表则直接赋值;在节点拷贝的过程中,有一些节点的next节点是不用调整的(链表后端部分片段),就直接利用了;对于前端部分的片段,则重新hash,然后插入到对应的链表中。
最后再将需要put进来的node,在扩容后的结构中插入。
size()
ConcurrentHashMap的size()操作需要统计所有的Segment中的HashEntry数量,最大为Integer.MAX_VALUE。因为在统计个过程中,有可能出现多线程修改的问题。即便如此,ConcurrentHashMap首先会用无锁尝试3次,如果统计失败,再加锁统计。代码如下:
containsValue()方法的思想和size()的基本相同,下面贴出代码,相信查看备注即可。
参考:http://www.infoq.com/cn/articles/ConcurrentHashMap
基本结构
从上述类图可以看出,ConcurrentHashMap除了Map通用的get()、put()等方法外,还有一个重要的属性Segment数组。每个Segment又是一个ReentrantLock,其内部又有一个HashEntry数组,HashEntry是最终key-value的存储单元。可以看出,HashEntry是一个链表结构。从而可以大致的得到如下结构图:
初始化
ConcurrentHashMap构造方法的3个入参分别是:initialCapacity、loadFactor、concurrencyLevel,用于初始化segment数组、segmentShift、segmentMask等数据。构造方法如下:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { // 1、参数校验 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 2、ssize是segments数组的大小,这里取的是大于concurrencyLevel的2^n的一个值 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 3、sshift就是上面描述中的n值,默认情况下concurrencyLevel等于16,sshift就等于4。因此默认情况下segmentShift的值就是28,这个值会参与hash运算。 // segmentMask是hash运算的掩码,默认情况下等于16-1=15,类似于网络中的子网掩码,segmentMask的二进制最后几位都是1,最大值是末尾16个1(65535)。 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; // 4、下面的代码主要是用来初始化segment,其中cap是segment里面的HashEntry数组的长度。它取的是大于等于c(Map容量/ssize)的2^N的一个值。 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 5、创建segments和segments[0](这里面只初始化了一个segments数组中的第0个元素) Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
initialCapacity:ConcurrentHashMap的初始化容量,默认值是16(DEFAULT_INITIAL_CAPACITY);
loadFactor:负载因子,默认值是0.75f,当ConcurrentHashMap中的HashEntry数量到达 loadFactor * 当前容量 时,就会进行rehash()进行扩容;
concurrencyLevel:并发的级别,也就是Segment数组的大小(ConcurrentHashMap是对每个Segment加锁,因此得名并发级别),默认值是16(DEFAULT_CONCURRENCY_LEVEL)。这里需要注意的是concurrencyLevel的大小都是2^n,concurrencyLevel会取大于传入值的邻近的一个值。如:传入15,则concurrencyLevel=16(2^4)。
下面介绍一下构造函数中的初始化流程:
参数校验。
设置ssize的值,ssize是segments数组的大小,这里取的是大于concurrencyLevel的2^n的一个值。
设置segmentShift和segmentMask的值,sshift就是上面描述中的n值,默认情况下concurrencyLevel等于16,sshift就等于4。因此默认情况下segmentShift的值就是28,这个值会参与hash运算。segmentMask是hash运算的掩码,默认情况下等于16-1=15,类似于网络中的子网掩码,segmentMask的二进制最后几位都是1,最大值是末尾16个1(65535)。
初始化segment,其中cap是segment里面的HashEntry数组的长度。它取的是大于等于c(Map容量/ssize)的2^N的一个值。
创建segments和segments[0](这里面只初始化了一个segments数组中的第0个元素)。
定位Segment
ConcurrentHashMap使用分段锁Segment来保护不同的数据,下面简单的描述一下定位的流程。
第一步是获取key的hash值,代码如下:
private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); // 将key的hash值打开到segment分段中去 // 使用了Wang/Jenkins hash的变体算法 h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }第二步,通过上面的散列算法得到的值,然后进行一下位移操作,取高位数值。
int index = (h >>> segmentShift) & segmentMask;默认情况下segmentShift为28,segmentMask为15(低位有1111),从而可以得到h的高四位的值。
常用方法
get()
get操作是先定位到segment,然后再到segment中去获取对应的value值。代码如下:
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); // 根据Segment的索引((h >>> segmentShift) & segmentMask)算出在Segment[]上的偏移量 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // 若Segment存在,则继续查找table[]的索引位置; // 根据table的索引((tab.length - 1) & h)算出在table[]上的偏移量,循环链表找出结果 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }通过上面代码可以看出:
首先,根据Segment的索引((h >>> segmentShift) & segmentMask)算出在Segment[]上的偏移量。然后根据偏移量,调用UNSAFE.getObjectVolatile()判断其是否存在。
若Segment存在,则继续查找table[]的索引位置;根据table的索引((tab.length - 1) & h)算出在table[]上的偏移量,循环链表找出结果。
从上面步骤可以看出,整个过程中读取并未加锁。所以在读取的过程中,如果出现HashEntry的变更,则无法得到真正的结果。这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,就必须要加锁控制。
put()
对于put()操作,前面的定位Segment的操作都是和put()相同的。找到Segment以后,然后对整个Segment加锁,然后再进行后续的操作。下面详细介绍一下:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 定位Segment,并判断其是否存在 int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);// 如果不存在,则创建;这是因为在构造ConcurrentHashMap的时候,只创建了segment[0]这个元素 return s.put(key, hash, value, false);// 提交给Segment去处理 }可以看出,首先定位Segment,并判断其是否存在;如果不存在,则创建(这是因为在构造ConcurrentHashMap的时候,只创建了segment[0]这个元素);然后再提交给Segment去处理。下面看一下创建Segment的过程:
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; // 定位Segment里面table的位置 long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // 使用segment[0]作为原型创建,这里面免去了一些计算 int cap = proto.table.length; // 2^n的一个值 float lf = proto.loadFactor; // 默认0.75f int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 准备创建前,再去检查是否已经创建 // 使用CAS创建,直至成功 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }上述首先定位出了table在segment中的位置;然后使用segment[0]作为原型创建元素(这里面免去了一些计算);然后使用CAS创建,直至成功。
下面看一下具体的put方法,由Segment实现:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 尝试加锁,首先尝试tryLock(),多次失败以后使用lock();同时会查找HashEntry,如果没有找到,创建并返回一个 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); // 循环定位链表中的HashEntry位置,然后执行变更 for (HashEntry<K,V> e = first;;) { if (e != null) {// 查找key,若找到直接修改value值 K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else {// 到达链表尾部 if (node != null)// node如果不等于null,说明之前已经预热完成,可以直接插入 node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 检查是否到达threshold,到达后则进行rehash() if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }从上面Segment的put操作可以看出:
首先,segment尝试tryLock(),多次失败以后使用lock(),同时会查找HashEntry,如果没有找到,创建并返回一个(预热操作)。
然后,循环定位链表中的HashEntry位置。如果查找到key,若找到直接修改value值,并退出。如果找不到则在链表头部新增一个HashEntry节点。
在新增节点的之后,会检查是否会达到threshold,如果到达则进行rehash()扩容。
关于rehash()操作,后面单独介绍。下面介绍一下scanAndLockForPut()的实现。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // 定位node时为负值 while (!tryLock()) {// 这里首先尝试使用tryLock(),达到最大重试次数MAX_SCAN_RETRIES后,转为lock()的阻塞操作 HashEntry<K,V> f; // to recheck first below if (retries < 0) {// 定位node的位置,若找不到则创建一个HashEntry if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) {// 达到MAX_SCAN_RETRIES时,进行阻塞加锁 lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // 如果加锁过程中,node有新增,则重新遍历链表,(这里可以解释对于链表的插入位置总是head的问题了) retries = -1; } } return node; }从上述代码可以看出,执行步骤大概如下:
这里首先尝试使用tryLock(),达到最大重试次数MAX_SCAN_RETRIES后,转为lock()的阻塞操作;
定位node位置的时候,如果找不到则创建一个HashEntry;
如果加锁过程中,node有新增,则重新遍历链表,(这里可以解释对于链表的插入位置总是head的问题了)。
rehash操作
rehash也就是扩容操作,扩容之后的容量是之前的两倍,所以扩容之后的newCapacity也是2^n的一个值。
private void rehash(HashEntry<K,V> node) { /* * 将table中每个节点重新分配到新的table中去。因为使用的是 *2的方式扩容, * 每个元素在table中的索引要么为i(不变),要么是i+oldCapacity。 * 如:扩容前容量是16,当前HashEntry在table[]中的索引为3,则新的索引可能为3或者19。 * 在节点拷贝的过程中,有一些节点的next节点是不用调整的,就直接利用了。 * 据统计,在默认的threshold值时, 扩容只需要1/6的节点需要拷贝。 * 那些被替换掉的节点,在没有任何线程引用的时候,将会被GC回收。 * Entry accesses use plain array indexing because they are followed by volatile table write. */ HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; // 容量*2操作 threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; // 遍历扩容前节点 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // 若链表为单节点 newTable[idx] = e; else { // 重复利用一些扩容后,next不变的节点,这些节点在原先链表的尾部 HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 对于next变化的节点重新计算hash(链表前面部分节点),然后重新插入 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // 将需要put的新node插入 node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }从上面的代码流程可以看出,其扩容步骤大致如下:
首先计算出newCapacity的容量;
然后循环table[],重新分配每条链表上面的元素。因为使用的是 *2的方式扩容,每个元素在table中的索引要么为i(不变),要么是i+oldCapacity。如:扩容前容量是16,当前HashEntry在table[]中的索引为3,则新的索引可能为3或者19。
拷贝过程中,如果为单链表则直接赋值;在节点拷贝的过程中,有一些节点的next节点是不用调整的(链表后端部分片段),就直接利用了;对于前端部分的片段,则重新hash,然后插入到对应的链表中。
最后再将需要put进来的node,在扩容后的结构中插入。
size()
ConcurrentHashMap的size()操作需要统计所有的Segment中的HashEntry数量,最大为Integer.MAX_VALUE。因为在统计个过程中,有可能出现多线程修改的问题。即便如此,ConcurrentHashMap首先会用无锁尝试3次,如果统计失败,再加锁统计。代码如下:
public int size() { // 首先尝试3次无锁的统计,如果失败,再进入加锁统计 final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // 当大小超过 32 bits 时为true long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) {// 加锁统计 for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; // 尝试3次无锁统计,这里面通过统计前后的modCount值的和 变化,这个值在每个Segment中,每一次变更操作都会递增,类似于Segment的版本号 for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }containsValue()
containsValue()方法的思想和size()的基本相同,下面贴出代码,相信查看备注即可。
public boolean containsValue(Object value) { // Same idea as size() if (value == null) throw new NullPointerException(); final Segment<K,V>[] segments = this.segments; boolean found = false; long last = 0; int retries = -1; try { outer: for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) {// 加锁统计 for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } long hashSum = 0L; int sum = 0; // 尝试3次无锁统计,这里面通过统计前后的modCount值的和 变化,这个值在每个Segment中,每一次变更操作都会递增,类似于Segment的版本号 for (int j = 0; j < segments.length; ++j) { HashEntry<K,V>[] tab; Segment<K,V> seg = segmentAt(segments, j); if (seg != null && (tab = seg.table) != null) { for (int i = 0 ; i < tab.length; i++) { HashEntry<K,V> e; for (e = entryAt(tab, i); e != null; e = e.next) { V v = e.value; if (v != null && value.equals(v)) { found = true; break outer; } } } sum += seg.modCount; } } if (retries > 0 && sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return found; }
参考:http://www.infoq.com/cn/articles/ConcurrentHashMap
相关文章推荐
- MyEclipse10+Jdk1.7+OSGI+MySql实现CRUD数据库
- jdk1.7实现ftp上传文件
- jdk1.7 String switch的实现
- JDK1.7 Paths,Files类实现文件夹的复制与删除
- JDK1.7 Paths,Files类实现文件夹的复制与删除的实例
- MyEclipse10+Jdk1.7+OSGI+MySql实现数据库的增删改查
- JDK1.7以上javaFTP上传删除文件的实现方法
- MyEclipse10+Jdk1.7+OSGI+MySql实现数据库的增删改查
- ConcurrentHashMap源码剖析(JDK1.7实现,JDK1.8更换了实现)
- JDK1.7实现中关于中文编码方案的选取
- jdk1.7 String switch的实现
- jdk1.7 nio实现文件拷贝
- jdk1.7 String switch的实现
- JDK1.7中HashMap的常用方法实现原理
- JDK1.7 之java.nio.file.Files 读取文件仅需一行代码实现
- Python实现CET查分的方法
- 无线路由器通过有线扩展信号覆盖范围(非桥接方式),实现无缝漫游
- 二分查找法的循环与递归实现及时间复杂度分析
- 实现简单的迷宫
- 在Android中使用SharedPreferences存储数据以实现数据共享