Java并发编程中并发容器ConcurrentHashMap和CopyOnWriteArrayList
2016-08-11 13:52
465 查看
简介
JDK 5 中加入的ConcurrentHashMap:用来替代同步且基于散列的Map
CopyOnWriteArrayList:用于在遍历操作为主要操作的情况下替代同步的List
ConcurrentMap:“若没有则添加”、替换、有条件删除
Queue:用来临时保存一组待处理的元素,Queue上的操作不会阻塞,如果队列为空,那么获取元素的操作将返回空值。虽然可以用List来模拟Queue的行为--事实上,正是通过LinkedList来实现Queue的,但还需要一个Queue的类,因为它能去掉List的随机访问需求,从而实现更高效的并发。
Queue的几种实现:
ConcurrentLinkedQueue:传统的先进先出队列
PriorityQueue:(非并发的)优先队列
BlockingQueue:增加了可阻塞的插入和获取操作
JDK 6 中加入的
ConcurrentSkipListMap:替代同步的SortedMap
ConcurrentSkipListSet:替代同步的SortedSet
ConcurrentHashMap
ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为“分段锁”。在这种机制中,任意数量的读取线程可以并发地访问Map,执行读取操作的线程和执行写入操作的线程可以并发的访问Map,并且一定数量的写入线程可以并发地修改Map。ConcurrentHashMap与其它并发容器一起增强了同步容器类:它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。ConcurrentHashMap返回的迭代器具有弱一致性(Weakly Consistent),而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。
ConcurrentHashMap略微减弱了size和isEmpty方法以反映容器的并发特性,以换取对其它更重要操作的性能优化,包括get、put、containsKey和remove等。
在ConcurrentHashMap中没有实现对Map加锁以提供独占访问。在HashTable和SynchronizedMap中,获得Map的锁能防止其它线程访问这个Map。在一些不常见的情况中需要这种功能,例如通过原子方式添加一些映射,或者对Map迭代若干次并在此期间保持元素顺序相同。
ConcurrentHashMap源码为:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { static final int DEFAULT_INITIAL_CAPACITY = 16; static final float DEFAULT_LOAD_FACTOR = 0.75f; static final int DEFAULT_CONCURRENCY_LEVEL = 16; static final int MAXIMUM_CAPACITY = 1 << 30; static final int MIN_SEGMENT_TABLE_CAPACITY = 2; static final int MAX_SEGMENTS = 1 << 16; // slightly conservative static final int RETRIES_BEFORE_LOCK = 2; private static class Holder { static final boolean ALTERNATIVE_HASHING; static { // Use the "threshold" system property even though our threshold // behaviour is "ON" or "OFF". String altThreshold = java.security.AccessController.doPrivileged( new sun.security.action.GetPropertyAction( "jdk.map.althashing.threshold")); int threshold; try { threshold = (null != altThreshold) ? Integer.parseInt(altThreshold) : Integer.MAX_VALUE; // disable alternative hashing if -1 if (threshold == -1) { threshold = Integer.MAX_VALUE; } if (threshold < 0) { throw new IllegalArgumentException("value must be positive integer."); } } catch(IllegalArgumentException failed) { throw new Error("Illegal value for 'jdk.map.althashing.threshold'", failed); } ALTERNATIVE_HASHING = threshold <= MAXIMUM_CAPACITY; } } private transient final int hashSeed = randomHashSeed(this); private static int randomHashSeed(ConcurrentHashMap instance) { if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) { return sun.misc.Hashing.randomHashSeed(instance); } return 0; } final int segmentMask; final int segmentShift; final Segment<K,V>[] segments; transient Set<K> keySet; transient Set<Map.Entry<K,V>> entrySet; transient Collection<V> values; static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * Sets next field with volatile write semantics. (See above * about use of putOrderedObject.) */ final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); } static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); } private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } static final class Segment<K,V> extends ReentrantLock implements Serializable { static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } @SuppressWarnings("unchecked") private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } private void scanAndLock(Object key, int hash) { // similar to but simpler than scanAndLockForPut HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; int retries = -1; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0) { if (e == null || key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; retries = -1; } } } final V remove(Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; } final boolean replace(K key, int hash, V oldValue, V newValue) { if (!tryLock()) scanAndLock(key, hash); boolean replaced = false; try { HashEntry<K,V> e; for (e = entryForHash(this, hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { if (oldValue.equals(e.value)) { e.value = newValue; ++modCount; replaced = true; } break; } } } finally { unlock(); } return replaced; } final V replace(K key, int hash, V value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V> e; for (e = entryForHash(this, hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; e.value = value; ++modCount; break; } } } finally { unlock(); } return oldValue; } final void clear() { lock(); try { HashEntry<K,V>[] tab = table; for (int i = 0; i < tab.length ; i++) setEntryAt(tab, i, null); ++modCount; count = 0; } finally { unlock(); } } } // Accessing segments @SuppressWarnings("unchecked") static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) { long u = (j << SSHIFT) + SBASE; return ss == null ? null : (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u); } @SuppressWarnings("unchecked") private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } // Hash-based segment and entry accesses @SuppressWarnings("unchecked") private Segment<K,V> segmentForHash(int h) { long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u); } @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) { HashEntry<K,V>[] tab; return (seg == null || (tab = seg.table) == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); } <span style="color:#ff0000;"> //其它构造函数里面都调用了这个构造函数 </span> @SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } public boolean isEmpty() { long sum = 0L; final Segment<K,V>[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum += seg.modCount; } } if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } if (sum != 0L) return false; } return true; } public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; } public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; } @SuppressWarnings("unchecked") public boolean containsKey(Object key) { Segment<K,V> s; // same as get() except no need for volatile value read HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return true; } } return false; } public boolean containsValue(Object value) { // Same idea as size() if (value == null) throw new NullPointerException(); final Segment<K,V>[] segments = this.segments; boolean found = false; long last = 0; int retries = -1; try { outer: for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } long hashSum = 0L; int sum = 0; for (int j = 0; j < segments.length; ++j) { HashEntry<K,V>[] tab; Segment<K,V> seg = segmentAt(segments, j); if (seg != null && (tab = seg.table) != null) { for (int i = 0 ; i < tab.length; i++) { HashEntry<K,V> e; for (e = entryAt(tab, i); e != null; e = e.next) { V v = e.value; if (v != null && value.equals(v)) { found = true; break outer; } } } sum += seg.modCount; } } if (retries > 0 && sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return found; } public boolean contains(Object value) { return containsValue(value); } @SuppressWarnings("unchecked") public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); } @SuppressWarnings("unchecked") public V putIfAbsent(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) s = ensureSegment(j); return s.put(key, hash, value, true); } public void putAll(Map<? extends K, ? extends V> m) { for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) put(e.getKey(), e.getValue()); } public V remove(Object key) { int hash = hash(key); Segment<K,V> s = segmentForHash(hash); return s == null ? null : s.remove(key, hash, null); } public boolean remove(Object key, Object value) { int hash = hash(key); Segment<K,V> s; return value != null && (s = segmentForHash(hash)) != null && s.remove(key, hash, value) != null; } public boolean replace(K key, V oldValue, V newValue) { int hash = hash(key); if (oldValue == null || newValue == null) throw new NullPointerException(); Segment<K,V> s = segmentForHash(hash); return s != null && s.replace(key, hash, oldValue, newValue); } public V replace(K key, V value) { int hash = hash(key); if (value == null) throw new NullPointerException(); Segment<K,V> s = segmentForHash(hash); return s == null ? null : s.replace(key, hash, value); } public void clear() { final Segment<K,V>[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment<K,V> s = segmentAt(segments, j); if (s != null) s.clear(); } } public Set<K> keySet() { Set<K> ks = keySet; return (ks != null) ? ks : (keySet = new KeySet()); } public Collection<V> values() { Collection<V> vs = values; return (vs != null) ? vs : (values = new Values()); } public Set<Map.Entry<K,V>> entrySet() { Set<Map.Entry<K,V>> es = entrySet; return (es != null) ? es : (entrySet = new EntrySet()); } public Enumeration<K> keys() { return new KeyIterator(); } public Enumeration<V> elements() { return new ValueIterator(); } <span style="color:#ff0000;"> /* ---------------- Iterator Support -------------- */ </span> abstract class HashIterator { int nextSegmentIndex; int nextTableIndex; HashEntry<K,V>[] currentTable; HashEntry<K, V> nextEntry; HashEntry<K, V> lastReturned; HashIterator() { nextSegmentIndex = segments.length - 1; nextTableIndex = -1; advance(); } final void advance() { for (;;) { if (nextTableIndex >= 0) { if ((nextEntry = entryAt(currentTable, nextTableIndex--)) != null) break; } else if (nextSegmentIndex >= 0) { Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--); if (seg != null && (currentTable = seg.table) != null) nextTableIndex = currentTable.length - 1; } else break; } } final HashEntry<K,V> nextEntry() { HashEntry<K,V> e = nextEntry; if (e == null) throw new NoSuchElementException(); lastReturned = e; // cannot assign until after null check if ((nextEntry = e.next) == null) advance(); return e; } public final boolean hasNext() { return nextEntry != null; } public final boolean hasMoreElements() { return nextEntry != null; } public final void remove() { if (lastReturned == null) throw new IllegalStateException(); ConcurrentHashMap.this.remove(lastReturned.key); lastReturned = null; } } final class KeyIterator extends HashIterator implements Iterator<K>, Enumeration<K> { public final K next() { return super.nextEntry().key; } public final K nextElement() { return super.nextEntry().key; } } final class ValueIterator extends HashIterator implements Iterator<V>, Enumeration<V> { public final V next() { return super.nextEntry().value; } public final V nextElement() { return super.nextEntry().value; } } final class WriteThroughEntry extends AbstractMap.SimpleEntry<K,V> { WriteThroughEntry(K k, V v) { super(k,v); } public V setValue(V value) { if (value == null) throw new NullPointerException(); V v = super.setValue(value); ConcurrentHashMap.this.put(getKey(), value); return v; } } final class EntryIterator extends HashIterator implements Iterator<Entry<K,V>> { public Map.Entry<K,V> next() { HashEntry<K,V> e = super.nextEntry(); return new WriteThroughEntry(e.key, e.value); } } final class KeySet extends AbstractSet<K> { public Iterator<K> iterator() { return new KeyIterator(); } public int size() { return ConcurrentHashMap.this.size(); } public boolean isEmpty() { return ConcurrentHashMap.this.isEmpty(); } public boolean contains(Object o) { return ConcurrentHashMap.this.containsKey(o); } public boolean remove(Object o) { return ConcurrentHashMap.this.remove(o) != null; } public void clear() { ConcurrentHashMap.this.clear(); } } final class Values extends AbstractCollection<V> { public Iterator<V> iterator() { return new ValueIterator(); } public int size() { return ConcurrentHashMap.this.size(); } public boolean isEmpty() { return ConcurrentHashMap.this.isEmpty(); } public boolean contains(Object o) { return ConcurrentHashMap.this.containsValue(o); } public void clear() { ConcurrentHashMap.this.clear(); } } final class EntrySet extends AbstractSet<Map.Entry<K,V>> { public Iterator<Map.Entry<K,V>> iterator() { return new EntryIterator(); } public boolean contains(Object o) { if (!(o instanceof Map.Entry)) return false; Map.Entry<?,?> e = (Map.Entry<?,?>)o; V v = ConcurrentHashMap.this.get(e.getKey()); return v != null && v.equals(e.getValue()); } public boolean remove(Object o) { if (!(o instanceof Map.Entry)) return false; Map.Entry<?,?> e = (Map.Entry<?,?>)o; return ConcurrentHashMap.this.remove(e.getKey(), e.getValue()); } public int size() { return ConcurrentHashMap.this.size(); } public boolean isEmpty() { return ConcurrentHashMap.this.isEmpty(); } public void clear() { ConcurrentHashMap.this.clear(); } } <span style="color:#ff0000;"> /* ---------------- Serialization Support -------------- */ </span> private void writeObject(java.io.ObjectOutputStream s) throws IOException { // force all segments for serialization compatibility for (int k = 0; k < segments.length; ++k) ensureSegment(k); s.defaultWriteObject(); final Segment<K,V>[] segments = this.segments; for (int k = 0; k < segments.length; ++k) { Segment<K,V> seg = segmentAt(segments, k); seg.lock(); try { HashEntry<K,V>[] tab = seg.table; for (int i = 0; i < tab.length; ++i) { HashEntry<K,V> e; for (e = entryAt(tab, i); e != null; e = e.next) { s.writeObject(e.key); s.writeObject(e.value); } } } finally { seg.unlock(); } } s.writeObject(null); s.writeObject(null); } @SuppressWarnings("unchecked") private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException { // Don't call defaultReadObject() ObjectInputStream.GetField oisFields = s.readFields(); final Segment<K,V>[] oisSegments = (Segment<K,V>[])oisFields.get("segments", null); final int ssize = oisSegments.length; if (ssize < 1 || ssize > MAX_SEGMENTS || (ssize & (ssize-1)) != 0 ) // ssize not power of two throw new java.io.InvalidObjectException("Bad number of segments:" + ssize); int sshift = 0, ssizeTmp = ssize; while (ssizeTmp > 1) { ++sshift; ssizeTmp >>>= 1; } UNSAFE.putIntVolatile(this, SEGSHIFT_OFFSET, 32 - sshift); UNSAFE.putIntVolatile(this, SEGMASK_OFFSET, ssize - 1); UNSAFE.putObjectVolatile(this, SEGMENTS_OFFSET, oisSegments); // set hashMask UNSAFE.putIntVolatile(this, HASHSEED_OFFSET, randomHashSeed(this)); // Re-initialize segments to be minimally sized, and let grow. int cap = MIN_SEGMENT_TABLE_CAPACITY; final Segment<K,V>[] segments = this.segments; for (int k = 0; k < segments.length; ++k) { Segment<K,V> seg = segments[k]; if (seg != null) { seg.threshold = (int)(cap * seg.loadFactor); seg.table = (HashEntry<K,V>[]) new HashEntry[cap]; } } // Read the keys and values, and put the mappings in the table for (;;) { K key = (K) s.readObject(); V value = (V) s.readObject(); if (key == null) break; put(key, value); } } <span style="color:#ff0000;"> // Unsafe mechanics </span> private static final sun.misc.Unsafe UNSAFE; private static final long SBASE; private static final int SSHIFT; private static final long TBASE; private static final int TSHIFT; private static final long HASHSEED_OFFSET; private static final long SEGSHIFT_OFFSET; private static final long SEGMASK_OFFSET; private static final long SEGMENTS_OFFSET; static { int ss, ts; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[].class; Class sc = Segment[].class; TBASE = UNSAFE.arrayBaseOffset(tc); SBASE = UNSAFE.arrayBaseOffset(sc); ts = UNSAFE.arrayIndexScale(tc); ss = UNSAFE.arrayIndexScale(sc); HASHSEED_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("hashSeed")); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentShift")); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentMask")); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segments")); } catch (Exception e) { throw new Error(e); } if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0) throw new Error("data type scale not a power of two"); SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); TSHIFT = 31 - Integer.numberOfLeadingZeros(ts); } }
ConcurrentMap源码为:
public interface ConcurrentMap<K, V> extends Map<K, V> { /** * 仅当K没有相应的映射值时才插入 */ V putIfAbsent(K key, V value); /** * 仅当K被映射到V时才移除 */ boolean remove(Object key, Object value); /** * 仅当K被映射到oldValue时才替换为newValue * */ boolean replace(K key, V oldValue, V newValue); /** *仅当K被映射到某个值时才替换为value */ V replace(K key, V value); }
CopyOnWriteArrayList
CopyOnWriteArrayList:用来替代同步List,在迭代期间不需要对容器进行加锁或复制。CopyOnWriteArraySet:用来替代同步Set。
“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变现。“写入时复制”容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰。“写入时复制”容器返回的迭代器不会抛出ConcurrentModificationException,并且返回的元素与迭代器创建的元素完全一致,而不必考虑之后修改操作所带来的影响。
显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。仅当迭代操作远远多于修改操作时,才应该使用“写入时复制”容器。这个准则很好地描述了许多事件通知系统:在分发通知时需要迭代已注册监听器链表,并调用每一个监听器,在大多数情况下,注册和注销事件监听器的操作远少于接收事件通知的操作。
CopyOnWriteArrayList的源代码为:
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { private static final long serialVersionUID = 8673264195747942595L; transient final ReentrantLock lock = new ReentrantLock(); private volatile transient Object[] array; final Object[] getArray() { return array; } final void setArray(Object[] a) { array = a; } public CopyOnWriteArrayList() { setArray(new Object[0]); } public CopyOnWriteArrayList(Collection<? extends E> c) { Object[] elements = c.toArray(); // c.toArray might (incorrectly) not return Object[] (see 6260652) if (elements.getClass() != Object[].class) elements = Arrays.copyOf(elements, elements.length, Object[].class); setArray(elements); } public CopyOnWriteArrayList(E[] toCopyIn) { setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); } public int size() { return getArray().length; } public boolean isEmpty() { return size() == 0; } private static boolean eq(Object o1, Object o2) { return (o1 == null ? o2 == null : o1.equals(o2)); } private static int indexOf(Object o, Object[] elements, int index, int fence) { if (o == null) { for (int i = index; i < fence; i++) if (elements[i] == null) return i; } else { for (int i = index; i < fence; i++) if (o.equals(elements[i])) return i; } return -1; } private static int lastIndexOf(Object o, Object[] elements, int index) { if (o == null) { for (int i = index; i >= 0; i--) if (elements[i] == null) return i; } else { for (int i = index; i >= 0; i--) if (o.equals(elements[i])) return i; } return -1; } public boolean contains(Object o) { Object[] elements = getArray(); return indexOf(o, elements, 0, elements.length) >= 0; } public int indexOf(Object o) { Object[] elements = getArray(); return indexOf(o, elements, 0, elements.length); } public int indexOf(E e, int index) { Object[] elements = getArray(); return indexOf(e, elements, index, elements.length); } public int lastIndexOf(Object o) { Object[] elements = getArray(); return lastIndexOf(o, elements, elements.length - 1); } public int lastIndexOf(E e, int index) { Object[] elements = getArray(); return lastIndexOf(e, elements, index); } public Object clone() { try { CopyOnWriteArrayList c = (CopyOnWriteArrayList)(super.clone()); c.resetLock(); return c; } catch (CloneNotSupportedException e) { // this shouldn't happen, since we are Cloneable throw new InternalError(); } } public Object[] toArray() { Object[] elements = getArray(); return Arrays.copyOf(elements, elements.length); } @SuppressWarnings("unchecked") public <T> T[] toArray(T a[]) { Object[] elements = getArray(); int len = elements.length; if (a.length < len) return (T[]) Arrays.copyOf(elements, len, a.getClass()); else { System.arraycopy(elements, 0, a, 0, len); if (a.length > len) a[len] = null; return a; } } // Positional Access Operations @SuppressWarnings("unchecked") private E get(Object[] a, int index) { return (E) a[index]; } public E get(int index) { return get(getArray(), index); } public E set(int index, E element) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); E oldValue = get(elements, index); if (oldValue != element) { int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len); newElements[index] = element; setArray(newElements); } else { // Not quite a no-op; ensures volatile write semantics setArray(elements); } return oldValue; } finally { lock.unlock(); } } public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } } public void add(int index, E element) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (index > len || index < 0) throw new IndexOutOfBoundsException("Index: "+index+ ", Size: "+len); Object[] newElements; int numMoved = len - index; if (numMoved == 0) newElements = Arrays.copyOf(elements, len + 1); else { newElements = new Object[len + 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index, newElements, index + 1, numMoved); } newElements[index] = element; setArray(newElements); } finally { lock.unlock(); } } public E remove(int index) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1; if (numMoved == 0) setArray(Arrays.copyOf(elements, len - 1)); else { Object[] newElements = new Object[len - 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index + 1, newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { lock.unlock(); } } public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (len != 0) { // Copy while searching for element to remove // This wins in the normal case of element being present int newlen = len - 1; Object[] newElements = new Object[newlen]; for (int i = 0; i < newlen; ++i) { if (eq(o, elements[i])) { // found one; copy remaining and exit for (int k = i + 1; k < len; ++k) newElements[k-1] = elements[k]; setArray(newElements); return true; } else newElements[i] = elements[i]; } // special handling for last cell if (eq(o, elements[newlen])) { setArray(newElements); return true; } } return false; } finally { lock.unlock(); } } private void removeRange(int fromIndex, int toIndex) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (fromIndex < 0 || toIndex > len || toIndex < fromIndex) throw new IndexOutOfBoundsException(); int newlen = len - (toIndex - fromIndex); int numMoved = len - toIndex; if (numMoved == 0) setArray(Arrays.copyOf(elements, newlen)); else { Object[] newElements = new Object[newlen]; System.arraycopy(elements, 0, newElements, 0, fromIndex); System.arraycopy(elements, toIndex, newElements, fromIndex, numMoved); setArray(newElements); } } finally { lock.unlock(); } } public boolean addIfAbsent(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // Copy while checking if already present. // This wins in the most common case where it is not present Object[] elements = getArray(); int len = elements.length; Object[] newElements = new Object[len + 1]; for (int i = 0; i < len; ++i) { if (eq(e, elements[i])) return false; // exit, throwing away copy else newElements[i] = elements[i]; } newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } } public boolean containsAll(Collection<?> c) { Object[] elements = getArray(); int len = elements.length; for (Object e : c) { if (indexOf(e, elements, 0, len) < 0) return false; } return true; } public boolean removeAll(Collection<?> c) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (len != 0) { // temp array holds those elements we know we want to keep int newlen = 0; Object[] temp = new Object[len]; for (int i = 0; i < len; ++i) { Object element = elements[i]; if (!c.contains(element)) temp[newlen++] = element; } if (newlen != len) { setArray(Arrays.copyOf(temp, newlen)); return true; } } return false; } finally { lock.unlock(); } } public boolean retainAll(Collection<?> c) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (len != 0) { // temp array holds those elements we know we want to keep int newlen = 0; Object[] temp = new Object[len]; for (int i = 0; i < len; ++i) { Object element = elements[i]; if (c.contains(element)) temp[newlen++] = element; } if (newlen != len) { setArray(Arrays.copyOf(temp, newlen)); return true; } } return false; } finally { lock.unlock(); } } public int addAllAbsent(Collection<? extends E> c) { Object[] cs = c.toArray(); if (cs.length == 0) return 0; Object[] uniq = new Object[cs.length]; final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; int added = 0; for (int i = 0; i < cs.length; ++i) { // scan for duplicates Object e = cs[i]; if (indexOf(e, elements, 0, len) < 0 && indexOf(e, uniq, 0, added) < 0) uniq[added++] = e; } if (added > 0) { Object[] newElements = Arrays.copyOf(elements, len + added); System.arraycopy(uniq, 0, newElements, len, added); setArray(newElements); } return added; } finally { lock.unlock(); } } public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { setArray(new Object[0]); } finally { lock.unlock(); } } public boolean addAll(Collection<? extends E> c) { Object[] cs = c.toArray(); if (cs.length == 0) return false; final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + cs.length); System.arraycopy(cs, 0, newElements, len, cs.length); setArray(newElements); return true; } finally { lock.unlock(); } } public boolean addAll(int index, Collection<? extends E> c) { Object[] cs = c.toArray(); final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (index > len || index < 0) throw new IndexOutOfBoundsException("Index: "+index+ ", Size: "+len); if (cs.length == 0) return false; int numMoved = len - index; Object[] newElements; if (numMoved == 0) newElements = Arrays.copyOf(elements, len + cs.length); else { newElements = new Object[len + cs.length]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index, newElements, index + cs.length, numMoved); } System.arraycopy(cs, 0, newElements, index, cs.length); setArray(newElements); return true; } finally { lock.unlock(); } } private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException{ s.defaultWriteObject(); Object[] elements = getArray(); // Write out array length s.writeInt(elements.length); // Write out all elements in the proper order. for (Object element : elements) s.writeObject(element); } private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); // bind to new lock resetLock(); // Read in array length and allocate array int len = s.readInt(); Object[] elements = new Object[len]; // Read in all elements in the proper order. for (int i = 0; i < len; i++) elements[i] = s.readObject(); setArray(elements); } public String toString() { return Arrays.toString(getArray()); } public boolean equals(Object o) { if (o == this) return true; if (!(o instanceof List)) return false; List<?> list = (List<?>)(o); Iterator<?> it = list.iterator(); Object[] elements = getArray(); int len = elements.length; for (int i = 0; i < len; ++i) if (!it.hasNext() || !eq(elements[i], it.next())) return false; if (it.hasNext()) return false; return true; } public int hashCode() { int hashCode = 1; Object[] elements = getArray(); int len = elements.length; for (int i = 0; i < len; ++i) { Object obj = elements[i]; hashCode = 31*hashCode + (obj==null ? 0 : obj.hashCode()); } return hashCode; } public Iterator<E> iterator() { return new COWIterator<E>(getArray(), 0); } public ListIterator<E> listIterator() { return new COWIterator<E>(getArray(), 0); } public ListIterator<E> listIterator(final int index) { Object[] elements = getArray(); int len = elements.length; if (index<0 || index>len) throw new IndexOutOfBoundsException("Index: "+index); return new COWIterator<E>(elements, index); } private static class COWIterator<E> implements ListIterator<E> { /** Snapshot of the array */ private final Object[] snapshot; /** Index of element to be returned by subsequent call to next. */ private int cursor; private COWIterator(Object[] elements, int initialCursor) { cursor = initialCursor; snapshot = elements; } public boolean hasNext() { return cursor < snapshot.length; } public boolean hasPrevious() { return cursor > 0; } @SuppressWarnings("unchecked") public E next() { if (! hasNext()) throw new NoSuchElementException(); return (E) snapshot[cursor++]; } @SuppressWarnings("unchecked") public E previous() { if (! hasPrevious()) throw new NoSuchElementException(); return (E) snapshot[--cursor]; } public int nextIndex() { return cursor; } public int previousIndex() { return cursor-1; } public void remove() { throw new UnsupportedOperationException(); } public void set(E e) { throw new UnsupportedOperationException(); } public void add(E e) { throw new UnsupportedOperationException(); } } public List<E> subList(int fromIndex, int toIndex) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (fromIndex < 0 || toIndex > len || fromIndex > toIndex) throw new IndexOutOfBoundsException(); return new COWSubList<E>(this, fromIndex, toIndex); } finally { lock.unlock(); } } private static class COWSubList<E> extends AbstractList<E> implements RandomAccess { private final CopyOnWriteArrayList<E> l; private final int offset; private int size; private Object[] expectedArray; // only call this holding l's lock COWSubList(CopyOnWriteArrayList<E> list, int fromIndex, int toIndex) { l = list; expectedArray = l.getArray(); offset = fromIndex; size = toIndex - fromIndex; } // only call this holding l's lock private void checkForComodification() { if (l.getArray() != expectedArray) throw new ConcurrentModificationException(); } // only call this holding l's lock private void rangeCheck(int index) { if (index<0 || index>=size) throw new IndexOutOfBoundsException("Index: "+index+ ",Size: "+size); } public E set(int index, E element) { final ReentrantLock lock = l.lock; lock.lock(); try { rangeCheck(index); checkForComodification(); E x = l.set(index+offset, element); expectedArray = l.getArray(); return x; } finally { lock.unlock(); } } public E get(int index) { final ReentrantLock lock = l.lock; lock.lock(); try { rangeCheck(index); checkForComodification(); return l.get(index+offset); } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = l.lock; lock.lock(); try { checkForComodification(); return size; } finally { lock.unlock(); } } public void add(int index, E element) { final ReentrantLock lock = l.lock; lock.lock(); try { checkForComodification(); if (index<0 || index>size) throw new IndexOutOfBoundsException(); l.add(index+offset, element); expectedArray = l.getArray(); size++; } finally { lock.unlock(); } } public void clear() { final ReentrantLock lock = l.lock; lock.lock(); try { checkForComodification(); l.removeRange(offset, offset+size); expectedArray = l.getArray(); size = 0; } finally { lock.unlock(); } } public E remove(int index) { final ReentrantLock lock = l.lock; lock.lock(); try { rangeCheck(index); checkForComodification(); E result = l.remove(index+offset); expectedArray = l.getArray(); size--; return result; } finally { lock.unlock(); } } public boolean remove(Object o) { int index = indexOf(o); if (index == -1) return false; remove(index); return true; } public Iterator<E> iterator() { final ReentrantLock lock = l.lock; lock.lock(); try { checkForComodification(); return new COWSubListIterator<E>(l, 0, offset, size); } finally { lock.unlock(); } } public ListIterator<E> listIterator(final int index) { final ReentrantLock lock = l.lock; lock.lock(); try { checkForComodification(); if (index<0 || index>size) throw new IndexOutOfBoundsException("Index: "+index+ ", Size: "+size); return new COWSubListIterator<E>(l, index, offset, size); } finally { lock.unlock(); } } public List<E> subList(int fromIndex, int toIndex) { final ReentrantLock lock = l.lock; lock.lock(); try { checkForComodification(); if (fromIndex<0 || toIndex>size) throw new IndexOutOfBoundsException(); return new COWSubList<E>(l, fromIndex + offset, toIndex + offset); } finally { lock.unlock(); } } } private static class COWSubListIterator<E> implements ListIterator<E> { private final ListIterator<E> i; private final int index; private final int offset; private final int size; COWSubListIterator(List<E> l, int index, int offset, int size) { this.index = index; this.offset = offset; this.size = size; i = l.listIterator(index+offset); } public boolean hasNext() { return nextIndex() < size; } public E next() { if (hasNext()) return i.next(); else throw new NoSuchElementException(); } public boolean hasPrevious() { return previousIndex() >= 0; } public E previous() { if (hasPrevious()) return i.previous(); else throw new NoSuchElementException(); } public int nextIndex() { return i.nextIndex() - offset; } public int previousIndex() { return i.previousIndex() - offset; } public void remove() { throw new UnsupportedOperationException(); } public void set(E e) { throw new UnsupportedOperationException(); } public void add(E e) { throw new UnsupportedOperationException(); } } // Support for resetting lock while deserializing private void resetLock() { UNSAFE.putObjectVolatile(this, lockOffset, new ReentrantLock()); } private static final sun.misc.Unsafe UNSAFE; private static final long lockOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = CopyOnWriteArrayList.class; lockOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("lock")); } catch (Exception e) { throw new Error(e); } } }
相关文章推荐
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
- Java并发编程:并发容器之CopyOnWriteArrayList
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
- Java并发编程:并发容器之CopyOnWriteArrayList
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
- java并发编程:并发容器之CopyOnWriteArrayList(转载)
- Java并发编程:并发容器之CopyOnWriteArrayList
- java并发编程--并发容器CopyOnWriteArrayList
- Java并发编程:并发容器之CopyOnWriteArrayList(转载
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
- java并发编程:并发容器之CopyOnWriteArrayList(转)
- Java并发编程:并发容器之CopyOnWriteArrayList
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
- Java并发编程:并发容器之CopyOnWriteArrayList
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
- java并发编程(11)--并发容器ConcurrentHashMap和CopyOnWriteArrayList
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
- java并发编程:并发容器之CopyOnWriteArrayList(转)