Java并发编程札记-(五)JUC容器-05ArrayBlockingQueue与LinkedBlockingQueue
2017-12-29 16:26
375 查看
今天来学习ArrayBlockingQueue与LinkedBlockingQueue。
ArrayBlockingQueue是一个基于数组的有界阻塞队列。“有界”表示数组容量是固定的。这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
属性
以上是与队列相关的属性。下面是与并发控制相关的属性。
enqueue(E x)方法源码如下:
将元素添加到队列之前,必须先获得独占锁。加锁后,若发现队列已满,返回false。(为什么这里没有调用notFull.await()方法?)将元素插入到队列后,调用notEmpty.signal()唤醒notEmpty上的等待线程。
take()
dequeue()方法如下
将元素添从队列中移除之前,必须先获得独占锁。加锁后,若发现队列为空,调用notEmpty.await(),使线程在notEmpty上等待。如果队列不为空,将元素添从队列中移除,然后调用notFull.signal()唤醒notFull的等待线程。
LinkedBlockingQueue是一个基于单向链表的、可指定大小的阻塞队列。
可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
节点类
属性
以上属性透漏出一个重要信息:插入和获取操作使用了不同的锁。
offer(E e)
enqueue(Node)源码如下
signalNotEmpty()源码如下
注意notEmpty是与takeLock相关联的,必须先获取takeLock锁,再调用notEmpty.signal()。
take()
dequeue()源码如下:
本文就讲到这里,想了解Java并发编程更多内容请参考:
Java并发编程札记-目录
ArrayBlockingQueue是一个基于数组的有界阻塞队列。“有界”表示数组容量是固定的。这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
属性
/** 队列中的数据 */ final Object[] items; /** 下个要删除的项的索引(take, poll, peek ,remove方法使用) */ int takeIndex; /** 下个插入的位置(put, offer, add方法使用) */ int putIndex; /** 队列中元素的数量 */ int count;
以上是与队列相关的属性。下面是与并发控制相关的属性。
/** 锁 */ final ReentrantLock lock; /** 不空的condition */ private final Condition notEmpty; /** 不满的condition */ private final Condition notFull;
核心方法
offer(E e)public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; //获取独占锁 lock.lock(); try { // 如果队列已满,则返回false。 if (count == items.length) return false; else { // 如果队列未满,则插入e,并返回true。 enqueue(e); return true; } } finally { //释放锁 lock.unlock(); } }
enqueue(E x)方法源码如下:
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; //将x添加到队列中 items[putIndex] = x; //如果队列已满,则将[下一个被添加元素的索引]置为0 if (++putIndex == items.length) putIndex = 0; count++; //唤醒notEmpty上的等待线程 notEmpty.signal(); }
将元素添加到队列之前,必须先获得独占锁。加锁后,若发现队列已满,返回false。(为什么这里没有调用notFull.await()方法?)将元素插入到队列后,调用notEmpty.signal()唤醒notEmpty上的等待线程。
take()
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; < e28b span class="hljs-comment">//获取锁,若当前线程是中断状态,则抛出InterruptedException异常 lock.lockInterruptibly(); try { //如果队列为空,则一直等待。 while (count == 0) notEmpty.await(); //取出元素并返回 return dequeue(); } finally { //释放锁 lock.unlock(); } }
dequeue()方法如下
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //唤醒在notFull上等待的线程 notFull.signal(); return x; }
将元素添从队列中移除之前,必须先获得独占锁。加锁后,若发现队列为空,调用notEmpty.await(),使线程在notEmpty上等待。如果队列不为空,将元素添从队列中移除,然后调用notFull.signal()唤醒notFull的等待线程。
LinkedBlockingQueue是一个基于单向链表的、可指定大小的阻塞队列。
可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
节点类
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
属性
/** 容量。初始化LinkedBlockingQueue时需要指定,如果不指定则默认为Integer.MAX_VALUE if none */ private final int capacity; /** 链表的实际大小 */ private final AtomicInteger count = new AtomicInteger(); /** * 链表的头结点 * 以下表达式一直成立: head.item == null */ transient Node<E> head; /** * 链表的尾节点 * 以下表达式一直成立: last.next == null */ private transient Node<E> last; /** 获取操作的锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 获取操作的等待队列condition */ private final Condition notEmpty = takeLock.newCondition(); /** 插入操作的锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 插入操作的等待队列condition */ private final Condition notFull = putLock.newCondition();
以上属性透漏出一个重要信息:插入和获取操作使用了不同的锁。
offer(E e)
/** * 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量) * 在成功时返回 true,如果此队列已满,则返回 false。 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //如果当前没有空间可用,则返回 false。 if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; //加锁 putLock.lock(); try { //再次判断队列是否已满 if (count.get() < capacity) { //插入元素到队列 enqueue(node); c = count.getAndIncrement(); //如果插入元素后,元素仍未满,唤醒在notFull上的等待线程 if (c + 1 < capacity) notFull.signal(); } } finally { //释放所 putLock.unlock(); } //如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程 if (c == 0) signalNotEmpty(); return c >= 0; }
enqueue(Node)源码如下
private void enqueue(Node<E> node) { last = last.next = node; }
last = last.next = node;执行后last.next的值为null?
signalNotEmpty()源码如下
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
注意notEmpty是与takeLock相关联的,必须先获取takeLock锁,再调用notEmpty.signal()。
take()
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //获取锁,若当前线程是中断状态,则抛出InterruptedException异常 takeLock.lockInterruptibly(); try { //若队列为空,则一直等待。 while (count.get() == 0) { notEmpty.await(); } //取出元素 x = dequeue(); //取出元素之后,返回原始的节点数量,然后节点数量-1。 c = count.getAndDecrement(); //如果取出元素后,队列不为空,则唤醒在notEmpty上的等待线程 if (c > 1) notEmpty.signal(); } finally { //释放锁 takeLock.unlock(); } //如果在获取节点之前队列为满;则取出节点后,唤醒notFull上的等待线程 if (c == capacity) signalNotFull(); return x; }
dequeue()源码如下:
private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
本文就讲到这里,想了解Java并发编程更多内容请参考:
Java并发编程札记-目录
相关文章推荐
- 05.JUC 集合 - LinkedBlockingQueue
- 源码解析关于java阻塞容器:ArrayBlockingQueue,LinkedBlockingQueue等
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueue
- java并发编程(二十四)----(JUC集合)ArrayBlockingQueue和LinkedBlockingQueue介绍
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueu
- Java 容器源码分析之ArrayBlockingQueue和LinkedBlockingQueue
- 并发队列ConcurrentLinkedQueue和阻塞栈LinkedBlockingQueue用法和阻塞队列ArrayBlockingQueue
- BlockQueue中ArrayBlockingQueue和LinkedBlockingQueue
- ArrayBlockingQueue和LinkedBlockingQueue的使用
- Java LinkedBlockingQueue和ArrayBlockingQueue分析
- juc中ArrayBlockingQueue为什么出入用同一个锁?
- Java LinkedBlockingQueue和ArrayBlockingQueue分析
- Java多线程系列--“JUC集合”08之 LinkedBlockingQueue
- ArrayBlockingQueue与LinkedBlockingQueue源码分析笔记
- Java多线程系列--“JUC集合”07之 ArrayBlockingQueue
- 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
- ArrayBlockingQueue跟LinkedBlockingQueue的区别
- 04.JUC 集合 - ArrayBlockingQueue
- Java并发之BlockingQueue 阻塞队列(ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、SynchronousQueue)
- Java集合类(十九)JUC中的集合--ArrayBlockingQueue