您的位置:首页 > 编程语言 > Java开发

JDK容器与并发—Queue—ConcurrentLinkedQueue

2016-08-28 10:28 429 查看

概述

      基于单链表的无界队列,线程安全。

1)FIFO;

2)wait-free;

3)迭代器弱一致性;

4)size()非固定时间,由于异步特新,若遍历过程有修改,则可能不正确;批量操作addAll、removeAll、retainAll、containsAll、equals、toArray不保证原子性;

5)遵守内存一致性原则;

数据结构

      基于单链表:

private static class Node<E> {
volatile E item;
volatile Node<E> next;

/**
* Constructs a new node.  Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}

boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}

boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

// Unsafe mechanics

private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

/**
* A node from which the first live (non-deleted) node (if any)
* can be reached in O(1) time.
* Invariants:
* - all live nodes are reachable from head via succ()
* - head != null
* - (tmp = head).next != tmp || tmp != head
* Non-invariants:
* - head.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
*   to not be reachable from head!
*/
private transient volatile Node<E> head;

/**
* A node from which the last node on list (that is, the unique
* node with node.next == null) can be reached in O(1) time.
* Invariants:
* - the last node is always reachable from tail via succ()
* - tail != null
* Non-invariants:
* - tail.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
*   to not be reachable from head!
* - tail.next may or may not be self-pointing to tail.
*/
private transient volatile Node<E> tail;


构造器

// 无参构造
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

// 带Collection参数构造
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode); // volatile
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}


增删查

// 将元素入队
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个元素
if (p.casNext(null, newNode)) {
// newNode入队成功,从head可达
if (p != t) // 入队两个节点维护一次tail
casTail(t, newNode);  // 其他线程有维护tail
return true;
}
// 有其他线程先入队成功,重读next
}
else if (p == q)
// 遍历到的p节点已删除,
// 如果实际tail为当前局部变量tail,说明tail已在head前,需要跳到head进入正常遍历;
// 否则,有其他线程维护过tail,从tail开始
p = (t != (t = tail)) ? t : head;
else
// 跟着tail,每隔两个节点更新局部变量t,向前推进
p = (p != t && t != (t = tail)) ? t : q;
}
}


public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;

if (item != null && p.casItem(item, null)) {
// CAS null成功,则说明p已从队列中删除
if (p != h) // 每删除两个节点维护一次head
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) { // 到达队尾
updateHead(h, p);
return null;
}
else if (p == q)	// p为已删除节点,且已经off-list,重新开始
continue restartFromHead;
else
p = q;
}
}
}


public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}


迭代器

private class Itr implements Iterator<E> {
/**
* Next node to return item for.
*/
private Node<E> nextNode;

/**
* nextItem holds on to item fields because once we claim
* that an element exists in hasNext(), we must return it in
* the following next() call even if it was in the process of
* being removed when hasNext() was called.
*/
private E nextItem;

/**
* Node of the last returned item, to support remove.
*/
private Node<E> lastRet;

Itr() {
advance();
}

/**
* Moves to next valid node and returns item to return for
* next(), or null if no such.
*/
private E advance() {
lastRet = nextNode;
E x = nextItem;

Node<E> pred, p;
if (nextNode == null) {
p = first();
pred = null;
} else {
pred = nextNode;
p = succ(nextNode);
}

for (;;) {
if (p == null) {
nextNode = null;
nextItem = null;
return x;
}
E item = p.item;
if (item != null) {
nextNode = p;
nextItem = item;
return x;
} else {
// skip over nulls
Node<E> next = succ(p);
if (pred != null && next != null)
pred.casNext(p, next);
p = next;
}
}
}

public boolean hasNext() {
return nextNode != null;
}

public E next() {
if (nextNode == null) throw new NoSuchElementException();
return advance();
}

public void remove() {
Node<E> l = lastRet;
if (l == null) throw new IllegalStateException();
// rely on a future traversal to relink.
l.item = null;
lastRet = null;
}
}


特性

      wait-free.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息