您的位置:首页 > 编程语言 > Java开发

Java多线程学习笔记—从Map开始说说同步和并发

2014-12-06 16:58 615 查看
在Java的关于Map集合中,我们经常会比较Hashtable和HashMap的区别。其中一点是区别关于线程安全,我们知道Hashtable是线程安全的,HashMap是非线程安全的。本文从Hashtable开始讨论说起关于Map的线程安全。通过源码我们知道:

1.	public synchronized int size(){...}  
2.	public synchronized boolean isEmpty(){...}  
3.	public synchronized boolean containsKey(Object key){...}  
4.	public synchronized boolean contains(Object value){...}  
5.	public synchronized V get(Object key){...}  
6.	public synchronized V put(K key, V value){...}  
7.	public synchronized V remove(Object key){...}  
8.	......


Hashtable通过对方法synchronized实现同步操作,我们知道synchronized修饰方法,我们实现的上锁机制是对整个类或者对象上锁,这样的同步方式并发效率并不高。比如,多个线程同时访问Hashtable对象的时候,其中一个线程get方法读取哈希表操作,该线程完全独占了Hashtable对象的锁,其他线程想同时get读取或者put写入都不行,这个的同步并发效率实在太低。

我们下面说说HashMap的线程安全实现,我们用该Collections.synchronizedMap 同步包装器来对HashMap进行同步包装。同步包装器的实现原理,我们用一段代码来看看:

public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
        return new SynchronizedMap<>(m);
    }
private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {
        private static final long serialVersionUID = 1978198479659022715L;

        private final Map<K,V> m;     // Backing Map
        final Object      mutex;        // Object on which to synchronize

        SynchronizedMap(Map<K,V> m) {
            if (m==null)
                throw new NullPointerException();
            this.m = m;
            mutex = this;
        }
       ......
      }
以上代码我们知道,Collections类下的类synchronizedMap实际上就是对HashMap包装下,然后通过mutex
对象锁来锁住HashMap对象,我们选取SynchronizedMap的一段实现代码:

public V get(Object key) {
            synchronized (mutex) {return m.get(key);}
        }

        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);}
        }
        public V remove(Object key) {
            synchronized (mutex) {return m.remove(key);}
        }


通过代码,我们知道HashMap通过对HashMap的方法加独占锁来实现同步,所有我们认为包装器包装过后的HashMap是线程安全的。通过以上代码分析我们,我们知道Hashtable和包装器包装过后的HashMap同样存在并发度不高的情况。

这里还要补充一点,对包装器包装的HashMap和Hashtable的方法访问我们知道是线程安全的。但是,我们不能认为对HashMap的多线程条件下访问就高枕无忧了,当多个访问Map的组合操作是否线程安全我们还要结合实际情况考虑。例如:以下情况,

if(map.containsKey(key)()
	map.remove(key);
当我们判断完map存在键为key的元素,这个时候另外一个线程刚好获取map对象,remove了键为key的对象,我们线程再返回到该线程继续执行的话,就会出错。这正是我们没有正确理解同步造成的,对Map的访问线程安全了,不代表对Map的系列操作就线程安全了。

通过以上的分析,我们知道Hashtable和Collections.synchronizedMap包装的Map并发度不高的主要限制因素,因为用的全局对象锁来实现互斥访问,让每次只有一个线程能访问。针对Hashtable和Collections.synchronizedMap访问线程安全条件下,并发度太低的问题,java类库的设计者,在JDK1.5的java.util.concurrent包下引入了ConcurrentHashMap,来解决多线程条件下Map并发度太低的问题。ConcurrentHashMap通过数据结构的巧妙设计和锁分离解决Map多线程访问的并发度不高的问题。

ConcurrentHashMap通过锁分离的技术来实现对Map分段上锁和对读/写操作同步不同处理来提高Map的并发度。ConcurrentHashMap包含一个Segment数组,如下:

/**
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;
通过对Map下的Segment对象上锁来让同步粒更小来提供并发度,我们看Segments下的数据结构包含如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
	transient volatile HashEntry<K,V>[] table;
……
}
从Segment的定义我们知道,Segment包含一个table,该table时间上是一个HashEntry<K,V>[] 的数组,我们可以理解Segment实际上是一个小的Hashtable。我们再看HashEntry的定义如下:

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }


从HashEntry的定义我们知道HashEntry基本是 不可变的,我们不可以修改HashEntry的引用,不能修改哈希表的键值,我们对value和下一个结点位置的修改,我们修改的值我们马上就能观察到。 HashEntry 类的 value 域被声明为 Volatile 型,Java
的内存模型可以保证:某个写线程对 value 域的写入马上可以被后续的某个读线程“看”到。因此,我们避免了对Map结点的值访问加锁。

从以上代码分析我们知道,ConcurrentHashMap实际上就是对Hashtable拆分成多段,每段中的Hashtable实现单独的同步。除了以上的处理外,ConcurrentHashMap在读(get)/写(put)处理上也采取不同的同步方式。下面通过代码我们先来分析get方法:

public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);//此处定位到Segment上
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {  //以下二次定位到Segment的节点上
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }


get没有使用锁同步,而是使用sun.misc.Unsafe.getObjectVolatile(Object, long)读取,保证读到的是最新的对象。下面我们来看看我们的写操作put的同步实现机制,代码如下:

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);//
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          
             (segments, (j << SSHIFT) + SBASE)) == null) //判断segment是否为空,为空构造segment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }


当查找到段Segment不为空,我们进入读节点操作。代码如下:'

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }


我们来解释下写方法怎么采取锁定策略的,我们分析这句代码HashEntry<K,V> node = tryLock() ?null:scanAndLockForPut(key, hash, value);我们知道,tryLock()方法可重入锁尝试获取锁,如果获取成功,后面的代码操作就是往Hashtable插入节点的常用代码。

如果没有获取到锁,我们看看后面的执行策略,查看方法scanAndLockForPut(key, hash, value)代码如下:

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

此处采取自旋锁来不断循环直到获取到线程访问节点的锁,循环才退出去。

此处自旋锁,没有完全采取蛮横的循环下去,有效的减轻传统的自旋锁对CPU资源的大量消耗的缺点,改进在每次循环获取尝试一次后,变量retries计时增加一次,当retries> MAX_SCAN_RETRIES的时候,阻塞该方法,跳出循环,返回节点值;还有,每次执行判断if ((retries & 1) == 0 && (f = entryForHash(this,hash))
!= first),这句话在判断节点是否写操作更新,更新的话,重置retries重新开始遍历。一旦获取了锁,直接返回节点,接着put操作。

通过以上的分析,我们可以总结如下:

1.ConcurrentHashMap通过把Hashtable切割成多个独立的Segment,通过分离锁而非全局锁分别对单独Segment加锁来提高并发;

2.ConcurrentHashMap对读/写操作采取不一样的同步措施,读(get方法)并没有加锁,但是即使写(put方法)同时的时候,读操作总能更新到最新的值;对于写操作,采取加分段加锁的方式处理,读写不同同步策略大大提高了并发度。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: