您的位置:首页 > 产品设计 > UI/UE

Java并发学习(二十)-ConcurrentLinkedQueue分析

2017-12-25 00:00 645 查看
前一篇文章学习了ConcurrentHashMap,线程安全的HashMap:Java并发学习(十九)-Java8中ConcurrentHashMap分析

此时,有时会需要一种线程安全的队列,它也是Concurrent家族的一员。

What is ConcurrentLinkedQueue

从名字可以大概猜到,是一个线程安全的队列。现在来想想,里面是怎么实现的呢?有以下几种方案:

使用synchronized来加锁,push以及pop时候加上锁,这样就线程安全了。

使用ReentrantLock等AQS锁来加锁,同样是push和pop时候上锁。

使用CAS方式实现push和pop方法。

当然前两种方法,很容易想到,原理就是加锁,这样的方法称为阻塞式的方法。而第三种方法使用循环CAS的方法则是非阻塞式的,Doug Lea大佬就是用的第三种方法实现了一个线程安全的队列,说起来简单,其内部设计并不像说起来这么简单的。

特性

总览一下ConcurrentLinkedQueue的特性:

使用 CAS 原子指令来处理对数据的并发访问,替换next等节点都是CAS方式,这是非阻塞算法得以实现的基础。

head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 / 出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 / 出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键,为什么是关键呢?比如你要在队尾增加一个元素。原来情况,你需要做2件事,一是把新元素链接上去,二是更改tail的指向,现在你只需要做第一件事了。

FIFO,先入先出。

当然一切,都是由head和tail的节点特性保证。

接下来看head和tail节点的特性:

head节点:

如果节点未删除,则从head可以通过succ()方法遍历达到

head 不能为 null

head 节点的 next 域不能引用到自身

head 节点的 item 域可能为 null,也可能不为 null

tail节点:

tail不能为null

通过 tail 调用 succ() 方法,最后节点总是可达的

tail 节点的 next 域可以引用到自身

tail 节点的 item 域可能为 null,也可能不为 null

接下来具体分析几个重要方法。

offer()

在ConcurrentLinkedQueue中,offer方法,用于往队列尾端添加一个元素。永远只会返回
true


先看它的源代码:

public boolean offer(E e) {
//检查是否为null
checkNotNull(e);
//实例化一个node
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {//自旋式,直到成功
//p表示尾节点,默认等于tail节点
Node<E> q = p.next;
if (q == null) {   //q为null,说明p是最后一个节点,tail指向了最后一个节点。
if (p.casNext(null, newNode)) {        //尝试在将newNode设置为p的next节点。
if (p != t)  //检测p是不是tail,不是的话,说明有线程瞬间也成功了,更换tail节点。
/*被别的节点抢先也没有关系,这里不管有没有把
newNode设为tail节点,最终会执行下面的return true。
所以tail可能不是指向最后一个节点。
*/
casTail(t, newNode);
return true;
}
//插入失败,肯定就是被其他线程插入成功了。但是自己的一定要成功才能返回。
}
else if (p == q)
//被人抢先了,tail已经改变了,所以p也要改变。如果在比较时候,仍然被人抢先,那就调到head重新走一遍。
p = (t != (t = tail)) ? t : head;
else
//p是t节点,但是实际上tail已经被改变了。
p = (p != t && t != (t = tail)) ? t : q;
}
}


这样一种典型情况:

1. 两个或多个线程同时走到第10行。

2. 最终结果是只有一个成功,因为CAS是原子性的,并且是和p.next对比,只有p.next==null,才会成功。

3. 假设最终线程A成功了,然后它尝试去更新尾节点,这里允许失败。

4. 一个新的线程要去执行offer方法增加节点,它会判断出这个tail不是真tail,因为tail.next !=null,它不会执行第10行那一段if语句里面的,会去寻找,直到找到新tail然后执行成功返回。如果实在找不到,可以通过head,利用succ方法一路找(head不变性)

poll()

出队方法,看它的源代码:

public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;   //获取p节点元素。

if (item != null && p.casItem(item, null)) { //item不为null,所以证明p还没有被弹出。因为要弹出head,所以把这里的item设为null。
//成功了
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);   //决定是用p还是用p.next来替换头节点。
//updateHead允许失败,失败的话,head.item=null,但是head不为null。
return item;
}
else if ((q = p.next) == null) {    //如果只有一个头节点。说明队列已经空了。
updateHead(h, p);
return null;
}
else if (p == q)
//重来一遍。
continue restartFromHead;
else
//如果下一个元素不为null,则把下一个元素设为头节点
p = q;
}
}
}


还是假设一个并发场景:

1. 多个线程尝试弹出一个元素,都会执行这一句:
item != null && p.casItem(item, null)
,验证item是否为null,不为null才继续执行CAS操作。也就是说,如果此时有一个线程成功把head.item设为null了,其他线程将无法完成CAS操作。

2. 当成功设item为null后,只需要尝试性将head节点设置下,允许失败,最终都会返回item。

3. 如果此后其他线程来到poll方法,则如果队列不为null,前三个判断都会执行失败,执行第22行,直接会把下一个节点设置为头节点。相应的item也会变化,就可以重新进入循环开始自己线程的操作了。

size()

和其他的并发容器的求和方法一样,ConcurrentLinkedQueue的大小也是不准确的,因为是无锁并发,所以随时可能变化。

public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}


lock-free和wait-free

jdk文档的注释,ConcurrentLInkedQueue是采用wait-free算法实现,和wait-free相关的,还有lock-free,那么这两个又都是什么意思呢?

这两个都是属于非阻塞性并发算法。

lock-free:无锁算法,比如在一组并发事务中,一个事务被阻塞了,cpu还能处理其他的事务,并不会占用cpu时间,从而确保cpu一直工作,提高处理吞吐量,但是会导致部分时延。

wait-free:通常在多核系统中,每个cpu都能分别处理一个并发事务。由于其对时延要求高,一般不会阻塞事务。在牺牲部分处理吞吐量的情况下,保证事务在一段时间内完成。linux系统内核(Linux kernel )就是一个很典型的wait-free算法实例。可以这样理解,wait-free,就是保证你的操作在一定时间内能够完成。

参考资料:

1. JDK1.8

2. http://ifeve.com/concurrentlinkedqueue/

3. https://www.ibm.com/developerworks/cn/java/j-lo-concurrent/index.html

4. https://rethinkdb.com/blog/lock-free-vs-wait-free-concurrency/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐