并发容器CopyOnWriteArrayList、ConcurrentLinkedQueue、ArrayBlockingQueue原理与使用
2018-04-01 09:49
585 查看
一、为什么要使用并发容器
举例说明:
线程安全容器–>线程不安全容器–>并发容器Vector –>ArrayList –>CopyOnWriteArrayList
Hashtable –>HashMap –>ConcurrentHashMap
从上面可以发现其中线程安全的容器Vector、Hashtable其实现原理都是通过在方法上加synchronized来实现线程安全的、这如果在多线程的环境下将只允许一个线程进行读写,这就相当于多线程差不多废了,而不安全的肯定就是没加synchronized咯,但是也可以通过Collections来实现线程安全,原理就是在运行的代码上通过synchronized加锁、方法还是一样,只不过级别从方法级别变成了代码块级别
安全性容器牺牲了性能,线程不安全的肯定就不安全咯,所以有时我们就得选择并发容器
import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; public class Demo { public static void main(String[] args) { ArrayList<String> s = new ArrayList<>(); Collections.synchronizedList(s); HashMap<String, Object> res = new HashMap<>(); Collections.synchronizedMap(res); } }
二、CopyOnWriteArrayList
原理看上图估计也可以看出来,就是先拷贝一份,等修改完毕之后在把变量的引用地址修改成新的数组即可,在修改的时候读操作不影响。
set的话就是先拷贝在判断是否一样、不一样才进行替换操作
remove基本差不多就是先拷贝在删除之后所有之后的下标全部前移
使用的话就跟ArrayList一样
三、ConcurrentLinkedQueue
1、ConcurrentLinkedQueue的介绍
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。2、ConcurrentLinkedQueue的结构
我们通过ConcurrentLinkedQueue的类图来分析一下它的结构。ConcurrentLinkedQueue由head节点和tair节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tair节点等于head节点。
1、private transient volatile Node tail = head;
3、入队列
入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及head节点和tair节点的变化,每添加一个节点我就做了一个队列的快照图。第一步添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
第二步添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
第三步添加元素3,设置tail节点的next节点为元素3节点。
第四步添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。
通过debug入队过程并观察head节点和tail节点的变化,发现入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助。
上面的分析让我们从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再通过源码来详细分析下它是如何使用CAS算法来入队的。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); //入队前,创建一个入队节点 Node</e><e> n = new Node</e><e>(e); retry: //死循环,入队不成功反复入队。 for (;;) { //创建一个指向tail节点的引用 Node</e><e> t = tail; //p用来表示队列的尾节点,默认情况下等于tail节点。 Node</e><e> p = t; for (int hops = 0; ; hops++) { //获得p节点的下一个节点。 Node</e><e> next = succ(p); //next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点 if (next != null) { //循环了两次及其以上,并且当前节点还是不等于尾节点 if (hops > HOPS && t != tail) continue retry; p = next; } //如果p是尾节点,则设置p节点的next节点为入队节点。 else if (p.casNext(null, n)) { //如果tail节点有大于等于1个next节点,则将入队节点设置成tair节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tair节点。 if (hops >= HOPS) casTail(t, n); // 更新tail节点,允许失败 return true; } // p有next节点,表示p的next节点是尾节点,则重新设置p节点 else { p = succ(p); } } } }
从源代码角度来看整个入队过程主要做二件事情。第一是定位出尾节点,第二是使用CAS算法能将入队节点设置成尾节点的next节点,如不成功则重试。
第一步定位尾节点。tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点,尾节点可能就是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加第一次节点,所以需要返回head节点。获取p节点的next节点代码如下
final Node</e><e> succ(Node</e><e> p) { Node</e><e> next = p.getNext(); return (p == next) ? head : next; }
第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。
hops的设计意图。上面分析过对于先进先出的队列入队所要做的事情就是将入队节点设置成尾节点,doug lea写的代码和逻辑还是稍微有点复杂。那么我用以下方式来实现行不行?
public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node</e><e> n = new Node</e><e>(e); for (;;) { Node</e><e> t = tail; if (t.casNext(null, n) && casTail(t, n)) { return true; } } }
让tail节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑非常清楚和易懂。但是这么做有个缺点就是每次都需要使用循环CAS更新tail节点。如果能减少CAS更新tail节点的次数,就能提高入队的效率,所以doug lea使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将 tail节点更新成尾节点,而是当 tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。
private static final int HOPS = 1;
还有一点需要注意的是入队方法永远返回true,所以不要通过返回值判断入队是否成功。
4、出队列
出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让我们通过每个节点出队的快照来观察下head节点的变化。从上图可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。让我们再通过源码来深入分析下出队过程。
public E poll() { Node</e><e> h = head; // p表示头节点,需要出队的节点 Node</e><e> p = h; for (int hops = 0;; hops++) { // 获取p节点的元素 E item = p.getItem(); // 如果p节点的元素不为空,使用CAS设置p节点引用的元素为null,如果成功则返回p节点的元素。 if (item != null && p.casItem(item, null)) { if (hops >= HOPS) { //将p节点下一个节点设置成head节点 Node</e><e> q = p.getNext(); updateHead(h, (q != null) ? q : p); } return item; } // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。那么获取p节点的下一个节点 Node</e><e> next = succ(p); // 如果p的下一个节点也为空,说明这个队列已经空了 if (next == null) { // 更新头节点。 updateHead(h, p); break; } // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点 p = next; } return null; }
首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。
四、ArrayBlockingQueue
1、ArrayBlockingQueue的介绍
ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,间接的实现了Queue接口和Collection接口。底层以数组的形式保存数据(实际上可看作一个循环数组)。常用的操作包括 add ,offer,put,remove,poll,take,peek。前三者add offer put 是插入的操作。后面四个方法是取出的操作。他们之间的区别和关联:
add: 内部实际上获取的offer方法,当Queue已经满了时,抛出一个异常。不会阻塞。
offer:当Queue已经满了时,返回false。不会阻塞。
put:当Queue已经满了时,会进入等待,只要不被中断,就会插入数据到队列中。会阻塞,可以响应中断。
取出方法中 remove和add相互对应。也就是说,调用remove方法时,假如对列为空,则抛出一场。另外的,poll与offer相互对应。take和put相互对应。peek方法比较特殊,前三个取出的方法,都会将元素从Queue的头部溢出,但是peek不会,实际上只是,获取队列头的元素。peek方法也不会阻塞。当队列为空时,直接返回Null。
因此我们只用关注put和take方法即可。
2、源码分析
1、保存数据的结构/** The queued items */ final Object[] items;
可以看到,是一个Object的数组。
2、共享锁
/** Main lock guarding all access */ final ReentrantLock lock;
注视也说明了,这是一个掌管所有访问操作的锁。全局共享。都会使用这个锁。
3、put方法
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //可中断的获取锁 try { while (count == items.length) //当线程从等待中被唤醒时,会比较当前队列是否已经满了 notFull.await(); //notFull = lock.newCondition 表示队列不满这种状况,假如现场在插入的时候 enqueue(e); //当前队列已经满了时,则需要等到这种情况的发生。 } finally { //可以看出这是一种阻塞式的插入方式 lock.unlock(); } }
当前队列不没满count加一插入
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
4、take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) //线程在刚进入 和 被唤醒时,会查看当前队列是否为空 notEmpty.await(); //notEmpty=lock.newCondition表示队列不为空的这种情况。假如一个线程进行take return dequeue(); //操作时,队列为空,则会一直等到到这种情况发生。 } finally { lock.unlock(); } }
3、例子
接口public interface Shop { public void push (); public void take (); public void size() ; }
生产者的实现
public class PushTarget implements Runnable { private Shop tmall; public PushTarget(Shop tmall) { this.tmall = tmall; } @Override public void run() { while(true) { tmall.push(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消费者的实现
public class TakeTarget implements Runnable { private Shop tmall; public TakeTarget(Shop tmall) { this.tmall = tmall; } @Override public void run() { while(true) { tmall.take(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
具体实现类
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Tmall3 implements Shop { public final int MAX_COUNT = 10; private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(MAX_COUNT); public void push() { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } public void take() { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } public void size() { while (true) { System.out.println("当前队列的长队为:" + queue.size()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
测试
public class Main { public static void main(String[] args) { Shop tmall = new Tmall3(); PushTarget p = new PushTarget(tmall); TakeTarget t = new TakeTarget(tmall); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(t).start(); new Thread(t).start(); new Thread(t).start(); new Thread(t).start(); new Thread(t).start(); new Thread(t).start(); new Thread(t).start(); new Thread(t).start(); new Thread(new Runnable() { @Override public void run() { tmall.size(); } }).start(); } }
参考:http://ifeve.com/concurrentlinkedqueue/
相关文章推荐
- 并发队列ConcurrentLinkedQueue、阻塞队列AraayBlockingQueue、阻塞队列LinkedBlockingQueue 区别 和 使用场景总结
- ArrayBlockingQueue和LinkedBlockingQueue的使用
- 并发队列ConcurrentLinkedQueue、阻塞队列AraayBlockingQueue、阻塞队列LinkedBlockingQueue 区别 和 使用场景总结
- 非阻塞队列ConcurrentLinkedQueue与阻塞队列LinkedBlockingQueue原理探究
- ConcurrentLinkedQueue、AraayBlockingQueue、LinkedBlockingQueue 区别及使用场景
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结(待整理)
- ConcurrentLinkedQueue ArrayBlockingQueue SynchronousQueue例子
- 自己总结 :并发队列ConcurrentLinkedQueue、阻塞队列AraayBlockingQueue、阻塞队列LinkedBlockingQueue 区别 和 使用场景总结
- 并发队列ConcurrentLinkedQueue和阻塞栈LinkedBlockingQueue用法和阻塞队列ArrayBlockingQueue
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
- ArrayBlockingQueue和LinkedBlockingQueue的使用
- [源码]Condition的原理,简单案例(ArrayBlockingQueue),复杂案例(LinkedBlockingQueue).
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析(还没看,先马)
- ArrayBlockingQueue与LinkedBlockingQueue的使用及区别
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- ArrayBlockingQueue和LinkedBlockingQueue的使用
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue