并发容器学习—ArrayBlockingQueue
2019-05-02 10:44
190 查看
一、ArrayBlockingQueue并发容器
1.ArrayBlockingQueue的继承体系
见名知义,ArrayBlockingQueue是个由数组支持的有界阻塞队列。也遵循先进先出(FIFO)的原则,也就是说ArrayBlockingQueue的容量是有限的,并不能像ArrayList那样自动扩容。ArrayBlockingQueue的继承关系如下图所示:
其中除了BlockingQueue接口未接触过,其余的类和接口都已经分析过,不在赘言。
public interface BlockingQueue<E> extends Queue<E> { //将指定的元素添加到队尾,若队列容量已满,则抛异常 boolean add(E e); //将指定的元素添加到队尾,若队列容量已满,那么放弃添加,返回false boolean offer(E e); //将指定的元素添加到队尾,若队列已满,那么等待队列有容量时在添加 //可被中断 void put(E e) throws InterruptedException; //在一定时间内尝试将指定元素添加到队尾,若超过指定时间仍未添加成功, //则放弃添加,返回false boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //若队列不为空,则获取并移除队首,若队列为空,则等待队列有元素可用 //可被中断 E take() throws InterruptedException; //在一定时间内尝试获取并移除队首元素,若超过指定时间仍无可用元素, //则返回null E poll(long timeout, TimeUnit unit) throws InterruptedException; //返回队列的剩余容量,不包含阻塞中的元素 int remainingCapacity(); //删除队列中的元素o boolean remove(Object o); //判断队列中是否含有元素o public boolean contains(Object o); //将队列中的所有元素转移到容器c中,队列中不留任何元素 int drainTo(Collection<? super E> c); //将队列中最多maxElements个元素删除并转移到容器c中 int drainTo(Collection<? super E> c, int maxElements); }
2.重要的属性及构造方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //底层数据存储的数组 final Object[] items; //队首元素在数组中的索引 int takeIndex; //队尾元素在数组中的索引 int putIndex; //队列中元素的个数 int count; //重入锁,用于同步操作 final ReentrantLock lock; //条件,允许出队的条件 private final Condition notEmpty; //允许入队的条件 private final Condition notFull; //迭代器 transient Itrs itrs = null; //创建一个容量为capacity的默认队列(不公平的队列) public ArrayBlockingQueue(int capacity) { this(capacity, false); } //创建一个容量为capacity的队列,是否公平由fair决定 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //数组大小 lock = new ReentrantLock(fair); //公平或不公平锁 notEmpty = lock.newCondition(); //出队条件 notFull = lock.newCondition(); //入队条件 } //创建一个容量为capacity的队列,是否公平由fair决定,且队列中 //包含有容器c中的元素 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); //加锁 try { int i = 0; try { for (E e : c) { checkNotNull(e); //元素不能为null items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } }
3.入队过程
在ArrayBlockingQueue中入队的方法有三个:add、offer和put,都是将元素添加到队尾,我们逐个来看:
//添加元素到队尾,若队列没有剩余容量,则放弃添加 public boolean offer(E e) { //入队元素不能为null,这里说明ArrayBlockingQueue中不允许有null元素存在 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); //加锁 try { //判断队列中的元素个数是否超过队列容量 //若队列没有剩余容量存放元素,那么就放弃添加 if (count == items.length) return false; else { enqueue(e); //入队方法 return true; } } finally { lock.unlock(); //解锁 } } /** * 入队方法 * ArrayBlockingQueue的入队和出队操作是典型的生产者消费者模式的使用 * ArrayBlockingQueue也是个典型生产者消费者队列,当队列中没有元素(产品) * 时,会挂起或拒绝所有的出队(消费)操作;当队列中的元素满了时,会挂起或拒 * 绝所有入队(生产)操作。而当队列中有元素时,则会唤醒或接受所有出队操作 * 同理,队列中还有空间时,则会唤醒或接受所有入队操作 */ private void enqueue(E x) { //获取底层数组引用 final Object[] items = this.items; items[putIndex] = x; //将元素添加到队尾 //判断队尾索引是否超过数组的长度 /** * ArrayBlockingQueue中底层数组采用环形数组来 * 实现数组的重复利用,putIndex永远指向队尾元素的索引 * 因此当putIndex的值等于数组长度时,说明已经到达数组的 * 最大索引,需要回到0的索引位置了 */ if (++putIndex == items.length) putIndex = 0; count++; //元素个数+1 notEmpty.signal(); //唤醒执行出队操作的线程 } //添加元素到队尾,若队列没有剩余容量,则抛出异常 public boolean add(E e) { return super.add(e); //父类方法 } //父类中的add方法 public boolean add(E e) { //执行子类的offer添加方法判断添加是否成功 if (offer(e)) return true; else //添加失败抛出异常 throw new IllegalStateException("Queue full"); } //在一定时间内尝试将指定元素添加到队尾,若超过指定时间仍未添加成功, //则放弃添加,返回false public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); //不允许添加null long nanos = unit.toNanos(timeout); //时间换算 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //可被打断的加锁 try { //循环判断队列是否已满 /** * 循环判断是为防止‘虚假唤醒’,即有多个入队操作线程被唤醒 * 但若此时只有一个剩余容量,那么这多个入队操作只有一个应该被执行 * 其他的入队操作仍应该继续等待,如果使用if来进行一次性的判断 * 那么就会造成在只有一个一个剩余容量的情况下,执行多次入队操作 * 这是不允许的,因此应该使用while来继续判断队列是否已满,防止出现 * 虚假唤醒。 */ while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); //等待nanos时间 } enqueue(e); return true; } finally { lock.unlock(); //解锁 } } //将指定的元素添加到队尾,若队列已满,那么等待队列有容量时在添加 //可被中断 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //可被中断的加锁 try { //判断队列是否已满,若是队列已满, //那么执行入队操作的当前线程进入条件队列(条件 //队列指的是重入锁底层AQS同步器中的线程条件队列, //不是当前的元素队列)中等待,只有队列有剩余空间时 //才会被唤醒尝试入队,这里也存在虚假唤醒的可能, //因此也需要使用while对唤醒的线程进行在一次的判断 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
4.出队过程
在ArrayBlockingQueue中的出队方法有poll和take方。
//将队首元素移除出队并返回,若队列为空,则返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //真正执行出队操作的方法 private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //获取队尾元素 items[takeIndex] = null; //将队尾元素从数组中移除 //判断队尾元素的索引takeIndex是否超过数组的边界 //超过数组索引最大值,则回到数组的索引起点0 if (++takeIndex == items.length) takeIndex = 0; count--; //队列中元素-1 //判断是否使用了队列的迭代器功能 if (itrs != null) itrs.elementDequeued(); //整理迭代器 //执行了一个出队操作,队列必不可能满,那么可以唤醒一个入队线程 notFull.signal(); return x; } //内部类Itrs中的整理迭代器结点的方法 void elementDequeued() { // assert lock.getHoldCount() == 1; if (count == 0) //队列中没有元素时,需要清空所有迭代器 queueIsEmpty(); else if (takeIndex == 0) //队尾索引指向索引0时需要将数组的循环次数+1 //然后对所有的迭代器进行清理,清除耗尽了的迭代器 takeIndexWrapped(); } //在一定时间内尝试获取并移除队首元素,若超过指定时间仍无可用元素, //则返回null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //可被中断的加锁 try { //判断队列是否为空,若队列为空则等待一定时间 while (count == 0) { if (nanos <= 0) //判断等待时间是否合法 return null; nanos = notEmpty.awaitNanos(nanos); //等待一定时间 } return dequeue(); } finally { lock.unlock(); } } //移除并返回队首元素,若队列中没有剩余空间,那当前 //出队操作的线程进入AQS条件等待队列中等待条件满足被唤醒 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列为空,进入等待,线程挂起 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
5.peek方法
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); //加锁 try { return itemAt(takeIndex); //返回索引处的元素 lock.unlock(); } } //获取数组i索引的元素 final E itemAt(int i) { return (E) items[i]; }
6.remove方法
public boolean remove(Object o) { //队列中不存在null元素,要删除null,直接失败 if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //判断队列是否是空队列,若是空队列那么 //就不需要去队列中查找,直接删除失败 //否则,再去队列中查找 if (count > 0) { final int putIndex = this.putIndex; //获取队尾索引 int i = takeIndex; //获取队首索引 //遍历队首到队尾直接的所有元素,查找是否存在要删除的元素 do { if (o.equals(items[i])) { removeAt(i); //找到要删除的元素,执行删除方法 return true; } if (++i == items.length) //判断索引是否越界,越界归零 i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } //将索引处元素从数组中移除,并整理数组 void removeAt(final int removeIndex) { final Object[] items = this.items; //若要删除的元素索引正好是队尾元素,那么直接删除替换成null即可 if (removeIndex == takeIndex) { items[takeIndex] = null; //删除元素 if (++takeIndex == items.length) //索引越界归零 takeIndex = 0; count--; //元素个数-1 if (itrs != null) //整理迭代器 itrs.elementDequeued(); } else { //要删除的元素索引不为队尾元素,那么删除的元素空出来的位置需要将后续 //到队尾的元素全都索引+1 final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) //下个索引是否越界,越界归零 next = 0; //判断移动数组中索引是否到达队尾 //不是队尾索引的话,将下个索引中元素移到当前索引即可 //到达队尾索引,则要赋为null,并且队尾索引要改变成当前索引 if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null; this.putIndex = i; break; } } count--; //元素个数-1 if (itrs != null) //整理迭代器 itrs.removedAt(removeIndex); } notFull.signal(); //唤醒入队操作 }
7.size方法
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; //队列元素个数 } finally { lock.unlock(); } }
相关文章推荐
- java并发编程-同步类容器-ArrayBlockingQueue
- Java 7之多线程并发容器 - ArrayBlockingQueue
- Thread学习(九) 并发的Queen学习ArrayBlockingQueue,LinkedBlockingQueue
- Java concurrent Framework并发容器之ArrayBlockingQueue(1.6)源码分析
- Java并发学习笔记(七)-ArrayBlockingQueue
- Java并发学习(二十二)-ArrayBlockingQueue分析
- 并发容器学习—LinkedBlockingQueue和LinkedBlockingDueue
- 并发容器学习—DelayQueue与PriorityBlockingQueue
- Java并发容器之ArrayBlockingQueue
- 并发容器分析(二)--ArrayBlockingQueue
- ( 十)并发包阻塞队列之ArrayBlockingQueue
- Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析
- 并发容器学习—LinkedTransferQueue
- JDK并发工具类源码学习系列——PriorityBlockingQueue
- JAVA并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- 并发队列 – 有界阻塞队列 ArrayBlockingQueue 原理探究
- JDK容器与并发—Queue—PriorityBlockingQueue
- Java多线程与并发应用-(10)-java阻塞队列实现ArrayBlockingQueue
- 详细分析Java并发集合ArrayBlockingQueue的用法
- 移动端并发编程基础篇-阻塞队列ArrayBlockingQueue&LinkedBlockingQueue