您的位置:首页 > 产品设计 > UI/UE

JAVA多线程之——ConcurrentLinkedQueue

2017-04-04 18:17 351 查看

ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个非阻塞的无界队列。非阻塞和阻塞区别。首先了解一下JAVA中多线程的同步机制基本采用三种方式:

volatile 轻量级的线程同步,不会引起上下文的切换和线程调度,提供内存的可见性,但不保证原子性。

CAS 轻量级的线程同步,不会引起上下文的切换和线程调度,提供内存的可见性和原子性。

内部锁(synchronized)和显示锁 重量级的线程同步,可能会引起上下文的切换和下城调度,提供内存的可见性和原子性。

非阻塞算法

一个线程的失败和挂起不会引起其他些线程的失败和挂起,这样的算法称为非阻塞算法。非阻塞算法通过使用底层机器级别的原子指令来取代锁,从而保证数据在并发访问下的一致性。

ConcurrentLinkedQueue就是采用CAS为基础来实现非阻塞算法。但是非阻塞算法的设计和实现都非常复杂,这里只能够大致学习以及分析一下ConcurrentLinkedQueue的一个实现思路。

我们都知道链表是基于节点实现,所以先学习一下ConcurrentLinkedQueue中的节点:

Node

private static class Node<E> {
volatile E item; //节点的域值
volatile Node<E> next; //当前节点的下一个节点
Node(E item) {
UNSAFE.putObject(this, itemOffset, item); //创建新节点。初始化其域值
}

boolean casItem(E cmp, E val) {//用 CAS 指令设置 item 域的值
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {//惰性设置 next 域的值
UNSAFE.putOrderedObject(this, nextOffset, val);
}

boolean casNext(Node<E> cmp, Node<E> val) {//CAS 设置 next 域的值
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

// Unsafe mechanics

private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}


了解节点后,先来看一下ConcurrentLinkedQueue的一些基本属性定义

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}


队列有一个head节点,tail节点,在初始化一个队列的时候,head节点和tail节点都是指向一个域值为空的节点。

offer方法

public boolean offer(E e) {
checkNotNull(e);//检查元素是否为空。空则抛出NullPointerException
final Node<E> newNode = new Node<E>(e);//创建一个新节点
for (Node<E> t = tail, p = t;;) {//死循环。定义两个节点,一个节点指向tail,一个节点指向t
Node<E> q = p.next;
if (q == null) {  //说明p是最后一个节点。
if (p.casNext(null, newNode)) { //CAS操作把新节点插入到p节点后
if (p != t)//每插入两个节点更新一次tail
casTail(t, newNode);  // 允许失败
return true;
}
//CAS失败,说明有其它线程已经插入,进行下一次循环
}
else if (p == q)
// 遍历到的p节点已删除,
// 如果实际tail为当前局部变量tail,说明tail已在head前,需要跳到head进入正常遍历;
// 否则,有其他线程维护过tail,从tail开始
p = (t != (t = tail)) ? t : head;
else
// 每隔两个节点更新局部变量t,向前推进
p = (p != t && t != (t = tail)) ? t : q;
}
}


上面代码初次理解可能会比较困难。关键在于理解tail的位置,它不一定是指向最后一个节点的。tail有三种情况,第一种指向尾节点。一种指向尾节点的前一个节点。还有一种就是指向了一个已经删除的节点(p==q)一个节点的下一个节点指向自己,就是说明它已经从链表中断开了。

这三种情况对应着上面的三个判断情况。

poll

public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// CAS null成功,则说明p已从队列中删除
if (p != h) //// 每删除两个节点维护一次head
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)// p为已删除节点,且已经off-list,重新开始
continue restartFromHead;
else
p = q;
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 多线程