JDK容器与并发—Queue—ConcurrentLinkedQueue
2016-08-28 10:28
429 查看
概述
基于单链表的无界队列,线程安全。1)FIFO;
2)wait-free;
3)迭代器弱一致性;
4)size()非固定时间,由于异步特新,若遍历过程有修改,则可能不正确;批量操作addAll、removeAll、retainAll、containsAll、equals、toArray不保证原子性;
5)遵守内存一致性原则;
数据结构
基于单链表:private static class Node<E> { volatile E item; volatile Node<E> next; /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } /** * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ private transient volatile Node<E> head; /** * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ private transient volatile Node<E> tail;
构造器
// 无参构造 public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } // 带Collection参数构造 public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); // volatile t = newNode; } } if (h == null) h = t = new Node<E>(null); head = h; tail = t; }
增删查
增
// 将元素入队 public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p是最后一个元素 if (p.casNext(null, newNode)) { // newNode入队成功,从head可达 if (p != t) // 入队两个节点维护一次tail casTail(t, newNode); // 其他线程有维护tail return true; } // 有其他线程先入队成功,重读next } else if (p == q) // 遍历到的p节点已删除, // 如果实际tail为当前局部变量tail,说明tail已在head前,需要跳到head进入正常遍历; // 否则,有其他线程维护过tail,从tail开始 p = (t != (t = tail)) ? t : head; else // 跟着tail,每隔两个节点更新局部变量t,向前推进 p = (p != t && t != (t = tail)) ? t : q; } }
删
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // CAS null成功,则说明p已从队列中删除 if (p != h) // 每删除两个节点维护一次head updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // 到达队尾 updateHead(h, p); return null; } else if (p == q) // p为已删除节点,且已经off-list,重新开始 continue restartFromHead; else p = q; } } }
查
public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
迭代器
private class Itr implements Iterator<E> { /** * Next node to return item for. */ private Node<E> nextNode; /** * nextItem holds on to item fields because once we claim * that an element exists in hasNext(), we must return it in * the following next() call even if it was in the process of * being removed when hasNext() was called. */ private E nextItem; /** * Node of the last returned item, to support remove. */ private Node<E> lastRet; Itr() { advance(); } /** * Moves to next valid node and returns item to return for * next(), or null if no such. */ private E advance() { lastRet = nextNode; E x = nextItem; Node<E> pred, p; if (nextNode == null) { p = first(); pred = null; } else { pred = nextNode; p = succ(nextNode); } for (;;) { if (p == null) { nextNode = null; nextItem = null; return x; } E item = p.item; if (item != null) { nextNode = p; nextItem = item; return x; } else { // skip over nulls Node<E> next = succ(p); if (pred != null && next != null) pred.casNext(p, next); p = next; } } } public boolean hasNext() { return nextNode != null; } public E next() { if (nextNode == null) throw new NoSuchElementException(); return advance(); } public void remove() { Node<E> l = lastRet; if (l == null) throw new IllegalStateException(); // rely on a future traversal to relink. l.item = null; lastRet = null; } }
特性
wait-free.相关文章推荐
- 深入浅出 Java Concurrency (20): 并发容器 part 5 ConcurrentLinkedQueue
- Java concurrent Framework并发容器之ConcurrentLinkedQueue(1.6)源码分析 ??
- java 非阻塞算法在并发容器中的实现(ConcurrentLinkedQueue源码)
- 从并发容器ConcurrentLinkedQueue看解决并发问题的设计思路
- Java并发容器之ConcurrentLinkedQueue
- 【死磕Java并发】-----J.U.C之Java并发容器:ConcurrentLinkedQueue
- JDK容器与并发—Queue—LinkedTransferQueue
- JDK并发工具类源码学习系列——ConcurrentLinkedQueue
- JDK容器与并发—Queue—LinkedBlockingQueue
- Java并发容器——ConcurrentLinkedQueue
- 【容器】并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
- Java 并发容器和框架--ConcurrentLinkedQueue
- Java并发容器之非阻塞队列ConcurrentLinkedQueue
- 并发基础_11_并发_容器_ConcurrentLinkedQueue
- Java并发容器ConcurrentHashMap、ConcurrentLinkedQueue、BlockingQueue等
- Java并发容器:ConcurrentLinkedQueue
- 第六章 Java并发容器和框架(ConcurrentHashMap,ConcurrentLinkedQueue,BlockingQueue,Fork Join)
- Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法