并发-ConcurrentLinkedQueue-源码解读
2019-05-13 15:18
183 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_20667511/article/details/90174006
/**
* Returns the first live (non-deleted) node on list, or null if none.
* This is yet another variant of poll/peek; here returning the
* first node, not element. We could make peek() a wrapper around
* first(), but that would cost an extra volatile read of item,
* and the need to add a retry loop to deal with the possibility
* of losing a race to a concurrent poll().
*/
其中这段话中,提到了 peek() 和 poll(). 这两个方法,执行的效率没有
/
* Returns the number of elements in this queue. If this queue
* contains more than {@code Integer.MAX_VALUE} elements, returns
* {@code Integer.MAX_VALUE}.
*
*
介绍
在并发编程中,我们可能经常需要用到线程安全的队列,java为此提供了两种模式的队列:阻塞队列和非阻塞队列。
阻塞队列和非阻塞队列如何实现线程安全? 1. 阻塞队列可以用一个锁(入队和出队共享一把锁)或者两个锁(入队使用一把锁,出队使用一把锁)来实现线程安全,JDK中典型的实现是BlockingQueue; 2.非阻塞队列可以用循环CAS的方式来保证数据的一致性,来达到线程安全的目的。
接下来我们就来看看JDK是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的
测试案例
线上测试案例,从直观的角度来说,看看效果
package cn.zwc.demo; import java.util.concurrent.ConcurrentLinkedQueue; /** * offer(E e) 将指定元素插入此队列的尾部。 poll() 获取并移除此队列的头,如果此队列为空,则返回 null。 peek() 获取但不移除此队列的头;如果此队列为空,则返回 null remove(Object o) 从队列中移除指定元素的单个实例(如果存在),只移除第一个出现的 size() 队列的大小 isEmpty 判断当前队列是否为空 * @author zwc * @date 2019年5月13日 下午1:49:15 */ public class ConcurrentLinkedDequeTest { public static void main(String[] args) { ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); boolean offer = queue.offer(2); queue.offer(3); queue.offer(4); queue.offer(3); System.out.println(queue.remove(3)); System.out.println("队列中 的大小是:"+ queue.size()); System.out.println("插入成功值" + offer +"\t" + queue.peek()); System.out.println("队列是否为空" + queue.isEmpty()); System.out.println("取出队首的值"+queue.poll()); System.out.println("队列是否为空" + queue.isEmpty()); // System.out.println(queue.poll()); // queue.add(1); // queue.add(2); // queue.add(3); // System.out.println(queue.peek()); // System.out.println(queue.size()); // System.out.println(queue.size()); } }
源码分析
1继承关系
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable
2 局部变量定义
// A node from which the first live (non-deleted) node (if any) can be reached in O(1) time. private transient volatile Node<E> head; // A node from which the last node on list (that is, the unique node with node.next == null) can be reached in O(1) time. private transient volatile Node<E> tail;
3.构造函数
/** * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
直接将集合转换成队列,通过
ConcurrentLinkedQueue的构造函数
public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node<E>(null); head = h; tail = t; }
4.方法
add()该方法,底层实现是
offer()
public boolean add(E e) { return offer(e); }
offer()这个方法很有意思,添加指定元素到队列的队尾,因为这个队列是的大小是无限制的,所以只会返回true,不会返回false。
// Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never return {@code false}. public boolean offer(E e) { // 如果该元素为空的,会抛出异常的 checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }
poll()方法,取出队列的头节点(删除该节点)
public E poll() { // goto的标记 restartFromHead: // 死循环 for (;;) { for (Node<E> h = head, p = h, q;;) { // 保存队头节点 E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); // 返回该队列的头结点 return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
peek()查看队头节点(不删除该节点)
public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; 3ff7 else p = q; } } }
first(),获取到第一个节点,而非元素,这个有别于
poll/peek(这两个方法获取到的是元素)
/**
* Returns the first live (non-deleted) node on list, or null if none.
* This is yet another variant of poll/peek; here returning the
* first node, not element. We could make peek() a wrapper around
* first(), but that would cost an extra volatile read of item,
* and the need to add a retry loop to deal with the possibility
* of losing a race to a concurrent poll().
*/
其中这段话中,提到了 peek() 和 poll(). 这两个方法,执行的效率没有
first()高
Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }
isEmpty(),队列为空的判断,在判断队列是否为空的情况下,isEmpty()的执行效率高于size()> 0 的判断
public boolean isEmpty() { return first() == null; }
size(),获取到队列的长度,该方法不能用于在高并发的情况下。该方法很消耗性能,获取到队列的长度,是通过遍历整个队列。
/
* Returns the number of elements in this queue. If this queue
* contains more than {@code Integer.MAX_VALUE} elements, returns
* {@code Integer.MAX_VALUE}.
*
*
Beware that, unlike in most collections, this method is
* NOT a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current
* number of elements requires an O(n) traversal.
* Additionally, if elements are added or removed during execution
* of this method, the returned result may be inaccurate. Thus,
* this method is typically not very useful in concurrent
* applications.
*
* @return the number of elements in this queue
*/
public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; return count; }
contains判断某一个元素是否包含在队列中
public boolean contains(Object o) { if (o == null) return false; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && o.equals(item)) return true; } return false; }
remove()删除队列中第一次出现的元素,删除成功,返回true
public boolean remove(Object o) { if (o == null) return false; Node<E> pred = null; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && o.equals(item) && p.casItem(item, null)) { Node<E> next = succ(p); if (pred != null && next != null) pred.casNext(p, next); return true; } pred = p; } return false; }
相关文章推荐
- ConcurrentLinkedQueue源码解读
- java 非阻塞算法在并发容器中的实现(ConcurrentLinkedQueue源码)
- Java并发容器之ConcurrentLinkedQueue源码分析
- 并发编程之 ConcurrentLinkedQueue 源码剖析
- J.U.C并发框架源码阅读(十)ConcurrentLinkedQueue
- JDK并发工具类源码学习系列——ConcurrentLinkedQueue
- Java并发中的ConcurrentLinkedQueue源码分析
- 并发编程之 ConcurrentLinkedQueue 源码剖析
- Java concurrent Framework并发容器之ConcurrentLinkedQueue(1.6)源码分析 ??
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
- 生产者与消费者的快速实现——并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
- Java并发学习(二十)-ConcurrentLinkedQueue分析
- 聊聊并发(6)ConcurrentLinkedQueue的实现原理分析
- 深入浅出 Java Concurrency (20): 并发容器 part 5 ConcurrentLinkedQueue
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用
- 学习笔记之ConcurrentLinkedQueue源码分析整理
- java挑战高并发(14): LinkedBlockingQueue和ConcurrentLinkedQueue的区别及用法
- Java 并发 --- 阻塞队列之LinkedTransferQueue源码分析
- Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors