Java并发学习(二十)-ConcurrentLinkedQueue分析
2017-12-25 00:00
645 查看
前一篇文章学习了ConcurrentHashMap,线程安全的HashMap:Java并发学习(十九)-Java8中ConcurrentHashMap分析 。
此时,有时会需要一种线程安全的队列,它也是Concurrent家族的一员。
使用synchronized来加锁,push以及pop时候加上锁,这样就线程安全了。
使用ReentrantLock等AQS锁来加锁,同样是push和pop时候上锁。
使用CAS方式实现push和pop方法。
当然前两种方法,很容易想到,原理就是加锁,这样的方法称为阻塞式的方法。而第三种方法使用循环CAS的方法则是非阻塞式的,Doug Lea大佬就是用的第三种方法实现了一个线程安全的队列,说起来简单,其内部设计并不像说起来这么简单的。
使用 CAS 原子指令来处理对数据的并发访问,替换next等节点都是CAS方式,这是非阻塞算法得以实现的基础。
head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 / 出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 / 出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键,为什么是关键呢?比如你要在队尾增加一个元素。原来情况,你需要做2件事,一是把新元素链接上去,二是更改tail的指向,现在你只需要做第一件事了。
FIFO,先入先出。
当然一切,都是由head和tail的节点特性保证。
接下来看head和tail节点的特性:
head节点:
如果节点未删除,则从head可以通过succ()方法遍历达到
head 不能为 null
head 节点的 next 域不能引用到自身
head 节点的 item 域可能为 null,也可能不为 null
tail节点:
tail不能为null
通过 tail 调用 succ() 方法,最后节点总是可达的
tail 节点的 next 域可以引用到自身
tail 节点的 item 域可能为 null,也可能不为 null
接下来具体分析几个重要方法。
先看它的源代码:
这样一种典型情况:
1. 两个或多个线程同时走到第10行。
2. 最终结果是只有一个成功,因为CAS是原子性的,并且是和p.next对比,只有p.next==null,才会成功。
3. 假设最终线程A成功了,然后它尝试去更新尾节点,这里允许失败。
4. 一个新的线程要去执行offer方法增加节点,它会判断出这个tail不是真tail,因为tail.next !=null,它不会执行第10行那一段if语句里面的,会去寻找,直到找到新tail然后执行成功返回。如果实在找不到,可以通过head,利用succ方法一路找(head不变性)。
还是假设一个并发场景:
1. 多个线程尝试弹出一个元素,都会执行这一句:
2. 当成功设item为null后,只需要尝试性将head节点设置下,允许失败,最终都会返回item。
3. 如果此后其他线程来到poll方法,则如果队列不为null,前三个判断都会执行失败,执行第22行,直接会把下一个节点设置为头节点。相应的item也会变化,就可以重新进入循环开始自己线程的操作了。
这两个都是属于非阻塞性并发算法。
lock-free:无锁算法,比如在一组并发事务中,一个事务被阻塞了,cpu还能处理其他的事务,并不会占用cpu时间,从而确保cpu一直工作,提高处理吞吐量,但是会导致部分时延。
wait-free:通常在多核系统中,每个cpu都能分别处理一个并发事务。由于其对时延要求高,一般不会阻塞事务。在牺牲部分处理吞吐量的情况下,保证事务在一段时间内完成。linux系统内核(Linux kernel )就是一个很典型的wait-free算法实例。可以这样理解,wait-free,就是保证你的操作在一定时间内能够完成。
参考资料:
1. JDK1.8
2. http://ifeve.com/concurrentlinkedqueue/
3. https://www.ibm.com/developerworks/cn/java/j-lo-concurrent/index.html
4. https://rethinkdb.com/blog/lock-free-vs-wait-free-concurrency/
此时,有时会需要一种线程安全的队列,它也是Concurrent家族的一员。
What is ConcurrentLinkedQueue
从名字可以大概猜到,是一个线程安全的队列。现在来想想,里面是怎么实现的呢?有以下几种方案:使用synchronized来加锁,push以及pop时候加上锁,这样就线程安全了。
使用ReentrantLock等AQS锁来加锁,同样是push和pop时候上锁。
使用CAS方式实现push和pop方法。
当然前两种方法,很容易想到,原理就是加锁,这样的方法称为阻塞式的方法。而第三种方法使用循环CAS的方法则是非阻塞式的,Doug Lea大佬就是用的第三种方法实现了一个线程安全的队列,说起来简单,其内部设计并不像说起来这么简单的。
特性
总览一下ConcurrentLinkedQueue的特性:使用 CAS 原子指令来处理对数据的并发访问,替换next等节点都是CAS方式,这是非阻塞算法得以实现的基础。
head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 / 出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 / 出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键,为什么是关键呢?比如你要在队尾增加一个元素。原来情况,你需要做2件事,一是把新元素链接上去,二是更改tail的指向,现在你只需要做第一件事了。
FIFO,先入先出。
当然一切,都是由head和tail的节点特性保证。
接下来看head和tail节点的特性:
head节点:
如果节点未删除,则从head可以通过succ()方法遍历达到
head 不能为 null
head 节点的 next 域不能引用到自身
head 节点的 item 域可能为 null,也可能不为 null
tail节点:
tail不能为null
通过 tail 调用 succ() 方法,最后节点总是可达的
tail 节点的 next 域可以引用到自身
tail 节点的 item 域可能为 null,也可能不为 null
接下来具体分析几个重要方法。
offer()
在ConcurrentLinkedQueue中,offer方法,用于往队列尾端添加一个元素。永远只会返回true
先看它的源代码:
public boolean offer(E e) { //检查是否为null checkNotNull(e); //实例化一个node final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) {//自旋式,直到成功 //p表示尾节点,默认等于tail节点 Node<E> q = p.next; if (q == null) { //q为null,说明p是最后一个节点,tail指向了最后一个节点。 if (p.casNext(null, newNode)) { //尝试在将newNode设置为p的next节点。 if (p != t) //检测p是不是tail,不是的话,说明有线程瞬间也成功了,更换tail节点。 /*被别的节点抢先也没有关系,这里不管有没有把 newNode设为tail节点,最终会执行下面的return true。 所以tail可能不是指向最后一个节点。 */ casTail(t, newNode); return true; } //插入失败,肯定就是被其他线程插入成功了。但是自己的一定要成功才能返回。 } else if (p == q) //被人抢先了,tail已经改变了,所以p也要改变。如果在比较时候,仍然被人抢先,那就调到head重新走一遍。 p = (t != (t = tail)) ? t : head; else //p是t节点,但是实际上tail已经被改变了。 p = (p != t && t != (t = tail)) ? t : q; } }
这样一种典型情况:
1. 两个或多个线程同时走到第10行。
2. 最终结果是只有一个成功,因为CAS是原子性的,并且是和p.next对比,只有p.next==null,才会成功。
3. 假设最终线程A成功了,然后它尝试去更新尾节点,这里允许失败。
4. 一个新的线程要去执行offer方法增加节点,它会判断出这个tail不是真tail,因为tail.next !=null,它不会执行第10行那一段if语句里面的,会去寻找,直到找到新tail然后执行成功返回。如果实在找不到,可以通过head,利用succ方法一路找(head不变性)。
poll()
出队方法,看它的源代码:public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; //获取p节点元素。 if (item != null && p.casItem(item, null)) { //item不为null,所以证明p还没有被弹出。因为要弹出head,所以把这里的item设为null。 //成功了 if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); //决定是用p还是用p.next来替换头节点。 //updateHead允许失败,失败的话,head.item=null,但是head不为null。 return item; } else if ((q = p.next) == null) { //如果只有一个头节点。说明队列已经空了。 updateHead(h, p); return null; } else if (p == q) //重来一遍。 continue restartFromHead; else //如果下一个元素不为null,则把下一个元素设为头节点 p = q; } } }
还是假设一个并发场景:
1. 多个线程尝试弹出一个元素,都会执行这一句:
item != null && p.casItem(item, null),验证item是否为null,不为null才继续执行CAS操作。也就是说,如果此时有一个线程成功把head.item设为null了,其他线程将无法完成CAS操作。
2. 当成功设item为null后,只需要尝试性将head节点设置下,允许失败,最终都会返回item。
3. 如果此后其他线程来到poll方法,则如果队列不为null,前三个判断都会执行失败,执行第22行,直接会把下一个节点设置为头节点。相应的item也会变化,就可以重新进入循环开始自己线程的操作了。
size()
和其他的并发容器的求和方法一样,ConcurrentLinkedQueue的大小也是不准确的,因为是无锁并发,所以随时可能变化。public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; return count; }
lock-free和wait-free
jdk文档的注释,ConcurrentLInkedQueue是采用wait-free算法实现,和wait-free相关的,还有lock-free,那么这两个又都是什么意思呢?这两个都是属于非阻塞性并发算法。
lock-free:无锁算法,比如在一组并发事务中,一个事务被阻塞了,cpu还能处理其他的事务,并不会占用cpu时间,从而确保cpu一直工作,提高处理吞吐量,但是会导致部分时延。
wait-free:通常在多核系统中,每个cpu都能分别处理一个并发事务。由于其对时延要求高,一般不会阻塞事务。在牺牲部分处理吞吐量的情况下,保证事务在一段时间内完成。linux系统内核(Linux kernel )就是一个很典型的wait-free算法实例。可以这样理解,wait-free,就是保证你的操作在一定时间内能够完成。
参考资料:
1. JDK1.8
2. http://ifeve.com/concurrentlinkedqueue/
3. https://www.ibm.com/developerworks/cn/java/j-lo-concurrent/index.html
4. https://rethinkdb.com/blog/lock-free-vs-wait-free-concurrency/
相关文章推荐
- java多线程-专题-聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
- Java并发编程-并发队列(ConcurrentLinkedQueue)的原理分析
- Java concurrent Framework并发容器之ConcurrentLinkedQueue(1.6)源码分析 ??
- Java并发中的ConcurrentLinkedQueue源码分析
- Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors
- Java并发包探秘 (一) ConcurrentLinkedQueue
- [Java 基础] 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
- 聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析
- 聊聊并发——ConcurrentLinkedQueue的实现原理分析
- java 非阻塞算法在并发容器中的实现(ConcurrentLinkedQueue源码)
- Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors
- Java并发容器之非阻塞队列ConcurrentLinkedQueue
- 聊聊并发(六)――ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析
- Java concurrent Framework并发容器之ArrayBlockingQueue(1.6)源码分析
- Java并发包探秘 (一) ConcurrentLinkedQueue
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法 在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。