Java 并发 --- 阻塞队列之ArrayBlockingQueue源码分析
2017-10-14 21:39
1401 查看
队列是一种FIFO(先进先出)数据结构,在前面我们知道分析过LinkedList的源码,LinkedList可以作为一般的队列使用,既然有阻塞队列,那么肯定就和一般的队列是有不一样的地方,并且使用场景也可能不一样,一起来探究一下阻塞队列的源码。
在看本文之前,要有 AbstractQueuedSynchronizer,ReentrantLock,Condition的知识,下面是我个人分析的博客,仅供参考:
Java 并发 —AbstractQueuedSynchronizer-共享模式与Condition
Java 并发 —ReentrantLock源码分析
阻塞队列是一个支持两个附加操作的队列,这两个附加操作支持阻塞的插入和移除方法。
支持阻塞的插入方法: 意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
ArrayBlockingQueue 实现了BlockingQueue接口,该接口中定义了阻塞的方法接口,
ArrayBlockingQueue 继承了AbstractQueue,具有了队列的行为。
ArrayBlockingQueue 实现了Serializable接口,可以序列化。
相比传统队列,ArrayBlockingQueue 中有三个重要的属性,可重入锁和Condition,可重入锁是独占式的锁,如果用可重入锁来控制对 队列的操作访问,那么此队列将是线程安全的,阻塞操作,那么什么情况下该阻塞,什么情况下不阻塞,这个是由Condition来控制的。
因此通过重入锁和Condition 实现了ArrayBlockingQueue的线程安全和条件阻塞。
notEmpty :表示的是队列不为空,符合这种条件那么可以进行出队操作,否则将会阻塞,直到队列不为空为止。
notFull: 表示是队列没有满,符合这种条件的可以进行入队操作,否则将会阻塞。
ArrayBlockingQueue 因为是有界的队列,因此需要指定队列的大小,它不会像ArrayList那样自动扩容,ReentrantLock 有公平锁和非公平锁之分,因此需要指定,默认是非公平锁,ReentrantLock 我们前面也分析过,对于公平锁,那么会遵循获取锁的FIFO规则,先阻塞的线程先获取锁,对于非公平锁,那么第一次获取锁的时候,会进行抢占式获取锁,也就是不管等待队列中是否有等待线程,同样参与竞争获取锁,如果获取失败,加入到等待队列,那么以后遵循FIFO规则。(注意:这里的等待队列不是指的ArrayBlockingQueue,而是同步器内部的队列)
2、指定集合初始化
处理指定队列大小,锁的公平性,还可以通过一个集合来初始化队列,在队列产生后,会把集合中的元素依次添加到队列中,初始集合的元素大小要和队列的容量一致,同时对队列的操作进行了加锁,保证了线程安全性,在finally 中释放锁,这样能保证就是出现异常,也能正确的释放锁。
1、add(E e)
将指定的元素插入到此队列的尾部,在成功时返回 true,如果此队列已满,则抛出 IllegalStateException
2、offer(E e)
将指定的元素插入到此队列的尾部在成功时返回 true,如果此队列已满,则返回 false。
在前面的add 中,内部调用了offer 方法,我们也可以直接调用offer 方法来完成入队操作。
从offer中可以看出来:ArrayBlockingQueue存储的元素是不能为空的,ArrayList和LinkedList可以存储空元素,如果队列满了不会阻塞,直接会返回false。
putIndex指示的是入队元素的存储位置,当队列满后,putIndex=0,可以看出,这是环形队列的用法,队尾不一定要是物理上的队列末尾,而是逻辑上的队尾,通过这种环形队列的用法,可以减少不必要的元素拷贝(元素出队以后,不用把元素整体往前移动),这个可以结合后面出队来看。
3、offer(E e, long timeout, TimeUnit unit)
将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待。
这里在获取锁的时候和前面的不一样,lockInterruptibly 表示可以中断的,前面的加锁对中断不敏感,也就是说,在前面的获取锁的方式中,别的线程对当前线程中断,当前线程不会理会(会记录中断状态),而可中断的获取锁,其它线程中断该线程的时候,会抛出中断异常,举个例子:如果队列满了,但是超时时间还没有到,此时如果不想执行入队操作了,那么就可以中断当前线程(注意:中断不是立即取消,中断后也可能会执行入队操作,参考:Java 并发 —中断机制)。
4、put(E e)
将指定的元素插入此队列的尾部,如果该队列已满,则等待。
当队列满时,notFull 条件不满足,因此会阻塞在该方法上(await()),这里用的while循环,因为当线程从notFull 条件阻塞中唤醒时,需要重新检查(可以此时被其它线程抢先了,导致还是不满足)。
1、poll()
获取并移除此队列的头,如果此队列为空,则返回 null
从这里出队和前面的入队相结合,可以看出队列是一种逻辑环形队列,队”满”(++putIndex == items.length),队”空”(++takeIndex == items.length)并不是严格意义上的队空,队满,我从网上找了一个环形队列的图,稍稍修改了一下:
对于迭代器部分,这里不会讲,后面会专门来分析迭代器。
2、poll(long timeout, TimeUnit unit)
获取并移除此队列的头部,在指定的等待时间前等待。
3、take() :
获取并移除此队列的头部,在元素变得可用之前一直等待
4、peek()
返回队头的元素,但是元素并不会出队。
因为底层是数组结构,因此通过索引可以直接访问到队头。
5、remove(Object o)
从此队列中移除指定元素的单个实例
这种移除队列中的元素的方式,就相当于删除数组中 非两端的元素一样,需要移动数组中的元素,因此这种方式需要相对开销要大点,对于
ArrayBlockingQueue 尽量不要操作非队头和队尾的元素。
ArrayBlockingQueue 线程安全,和Vector不一样,Vector 中用的synchronized 关键字进行线程同步,ArrayBlockingQueue 中通过ReentrantLock来完成的。
ArrayBlockingQueue 中的ReentrantLock 有公平和非公平之分,因此ArrayBlockingQueue 相当于也有公平性和非公平性之分。
ArrayBlockingQueue 比一般的队列多了两个附加操作(阻塞式的插入和移除方法),这个依赖于Condition实现。
ArrayBlockingQueue 是一种逻辑上的环形队列。
ArrayBlockingQueue 在入队和出队上都使用了同一个重入锁,因此入队和出队是不能并发执行的。
在看本文之前,要有 AbstractQueuedSynchronizer,ReentrantLock,Condition的知识,下面是我个人分析的博客,仅供参考:
Java 并发 —AbstractQueuedSynchronizer-共享模式与Condition
Java 并发 —ReentrantLock源码分析
阻塞队列是一个支持两个附加操作的队列,这两个附加操作支持阻塞的插入和移除方法。
支持阻塞的插入方法: 意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
ArrayBlockingQueue介绍(jdk 1.8)
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行操作。继承体系
ArrayBlockingQueue 实现了BlockingQueue接口,该接口中定义了阻塞的方法接口,
ArrayBlockingQueue 继承了AbstractQueue,具有了队列的行为。
ArrayBlockingQueue 实现了Serializable接口,可以序列化。
数据结构
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** The queued items ,底层存储元素的数组*/ final Object[] items; /** items index for next take, poll, peek or remove ,队首的索引(出队)*/ int takeIndex; /** items index for next put, offer, or add ,队尾的索引(入队)*/ int putIndex; /** Number of elements in the queue ,队列中元素的个数*/ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access ,重入锁*/ final ReentrantLock lock; /** Condition for waiting takes ,出队的条件*/ private final Condition notEmpty; /** Condition for waiting puts ,入队的条件*/ private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ transient Itrs itrs = null; }
相比传统队列,ArrayBlockingQueue 中有三个重要的属性,可重入锁和Condition,可重入锁是独占式的锁,如果用可重入锁来控制对 队列的操作访问,那么此队列将是线程安全的,阻塞操作,那么什么情况下该阻塞,什么情况下不阻塞,这个是由Condition来控制的。
因此通过重入锁和Condition 实现了ArrayBlockingQueue的线程安全和条件阻塞。
notEmpty :表示的是队列不为空,符合这种条件那么可以进行出队操作,否则将会阻塞,直到队列不为空为止。
notFull: 表示是队列没有满,符合这种条件的可以进行入队操作,否则将会阻塞。
构造方法
1、public ArrayBlockingQueue(int capacity) { //默认使用非公平锁 this(capacity, false); }
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
ArrayBlockingQueue 因为是有界的队列,因此需要指定队列的大小,它不会像ArrayList那样自动扩容,ReentrantLock 有公平锁和非公平锁之分,因此需要指定,默认是非公平锁,ReentrantLock 我们前面也分析过,对于公平锁,那么会遵循获取锁的FIFO规则,先阻塞的线程先获取锁,对于非公平锁,那么第一次获取锁的时候,会进行抢占式获取锁,也就是不管等待队列中是否有等待线程,同样参与竞争获取锁,如果获取失败,加入到等待队列,那么以后遵循FIFO规则。(注意:这里的等待队列不是指的ArrayBlockingQueue,而是同步器内部的队列)
2、指定集合初始化
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
处理指定队列大小,锁的公平性,还可以通过一个集合来初始化队列,在队列产生后,会把集合中的元素依次添加到队列中,初始集合的元素大小要和队列的容量一致,同时对队列的操作进行了加锁,保证了线程安全性,在finally 中释放锁,这样能保证就是出现异常,也能正确的释放锁。
入队
ArrayBlockingQueue提供了诸多方法,可以将元素加入队列尾部1、add(E e)
将指定的元素插入到此队列的尾部,在成功时返回 true,如果此队列已满,则抛出 IllegalStateException
public boolean add(E e) { return super.add(e); } //super.add(e) public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
2、offer(E e)
将指定的元素插入到此队列的尾部在成功时返回 true,如果此队列已满,则返回 false。
在前面的add 中,内部调用了offer 方法,我们也可以直接调用offer 方法来完成入队操作。
/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. This method is generally preferable to method {@link #add}, * which can fail to insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { //判断元素是否为空,如果为空,会抛出空指针异常 checkNotNull(e); final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { //如果队列已经满了,返回false if (count == items.length) return false; else { //将元素入队 enqueue(e); return true; } } finally { //释放锁 lock.unlock(); } }
从offer中可以看出来:ArrayBlockingQueue存储的元素是不能为空的,ArrayList和LinkedList可以存储空元素,如果队列满了不会阻塞,直接会返回false。
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; //如果入队后,队列"满"了,那么putIndex=0 if (++putIndex == items.length) putIndex = 0; count++; //队列不为空满足,唤醒阻塞在队列为空条件上的一个线程。 notEmpty.signal(); }
putIndex指示的是入队元素的存储位置,当队列满后,putIndex=0,可以看出,这是环形队列的用法,队尾不一定要是物理上的队列末尾,而是逻辑上的队尾,通过这种环形队列的用法,可以减少不必要的元素拷贝(元素出队以后,不用把元素整体往前移动),这个可以结合后面出队来看。
3、offer(E e, long timeout, TimeUnit unit)
将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待。
/** * Inserts the specified element at the tail of this queue, waiting * up to the specified wait time for space to become available if * the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //加锁,可中断 lock.lockInterruptibly(); try { //如果队满,则等待 while (count == items.length) { //超时,返回false if (nanos <= 0) return false; //(未超时)等待 nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
这里在获取锁的时候和前面的不一样,lockInterruptibly 表示可以中断的,前面的加锁对中断不敏感,也就是说,在前面的获取锁的方式中,别的线程对当前线程中断,当前线程不会理会(会记录中断状态),而可中断的获取锁,其它线程中断该线程的时候,会抛出中断异常,举个例子:如果队列满了,但是超时时间还没有到,此时如果不想执行入队操作了,那么就可以中断当前线程(注意:中断不是立即取消,中断后也可能会执行入队操作,参考:Java 并发 —中断机制)。
4、put(E e)
将指定的元素插入此队列的尾部,如果该队列已满,则等待。
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //获取锁,可中断 lock.lockInterruptibly(); try { //如果队列满,则阻塞在notFull 条件上 while (count == items.length) notFull.await(); //入队 enqueue(e); } finally { //释放锁 lock.unlock(); } }
当队列满时,notFull 条件不满足,因此会阻塞在该方法上(await()),这里用的while循环,因为当线程从notFull 条件阻塞中唤醒时,需要重新检查(可以此时被其它线程抢先了,导致还是不满足)。
出队
和入队一样,ArrayBlockingQueue 也提供了很多出队的方法。1、poll()
获取并移除此队列的头,如果此队列为空,则返回 null
public E poll() { final ReentrantLock lock = this.lock; //获取锁 lock.lock(); try { //队列为空返回null,否则返回队头 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
/** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; //获取队头元素 E x = (E) items[takeIndex]; items[takeIndex] = null; //新队头位置(队"空") if (++takeIndex == items.length) takeIndex = 0; count--; //维护迭代器 if (itrs != null) itrs.elementDequeued(); //出队了一个元素,队列不满条件满足,唤醒阻塞在队满条件上的一个线程 notFull.signal(); return x; }
从这里出队和前面的入队相结合,可以看出队列是一种逻辑环形队列,队”满”(++putIndex == items.length),队”空”(++takeIndex == items.length)并不是严格意义上的队空,队满,我从网上找了一个环形队列的图,稍稍修改了一下:
对于迭代器部分,这里不会讲,后面会专门来分析迭代器。
2、poll(long timeout, TimeUnit unit)
获取并移除此队列的头部,在指定的等待时间前等待。
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //获取锁,可中断 lock.lockInterruptibly(); try { //队空 while (count == 0) { //超时 if (nanos <= 0) return null; //(未超时)等待 nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
3、take() :
获取并移除此队列的头部,在元素变得可用之前一直等待
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //可中断的获取锁 lock.lockInterruptibly(); try { //如果队列空,那么阻塞等待 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
4、peek()
返回队头的元素,但是元素并不会出队。
public E peek() { final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { //释放锁 lock.unlock(); } }
final E itemAt(int i) { return (E) items[i]; }
因为底层是数组结构,因此通过索引可以直接访问到队头。
5、remove(Object o)
从此队列中移除指定元素的单个实例
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //队列中有元素 if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { //移除 removeAt(i); return true; } //循环 if (++i == items.length) i = 0; } while (i != putIndex); //循环到达队尾为止 } return false; } finally { lock.unlock(); } }
/** * Deletes item at array index removeIndex. * Utility for remove(Object) and iterator.remove. * Call only when holding lock. */ void removeAt(final int removeIndex) { final Object[] items = this.items; //需要移除元素的位置就是下个需要出队元素的位置,那么和一般出队方法一样 if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex; //移动队列中的元素 for (int i = removeIndex;;) { //下一个位置索引 int next = i + 1; if (next == items.length) next = 0; //还不是队尾,依次"前移" if (next != putIndex) { items[i] = items[next]; i = next; } else { //移动元素完毕,从新设置队尾索引 items[i] = null; this.putIndex = i; break; } } count--; //维护迭代器 if (itrs != null) itrs.removedAt(removeIndex); } //唤醒阻塞在队列满条件下的一个线程 notFull.signal(); }
这种移除队列中的元素的方式,就相当于删除数组中 非两端的元素一样,需要移动数组中的元素,因此这种方式需要相对开销要大点,对于
ArrayBlockingQueue 尽量不要操作非队头和队尾的元素。
总结
ArrayBlockingQueue 底层是基于数组实现的队列,容量指定后,不会改变。ArrayBlockingQueue 线程安全,和Vector不一样,Vector 中用的synchronized 关键字进行线程同步,ArrayBlockingQueue 中通过ReentrantLock来完成的。
ArrayBlockingQueue 中的ReentrantLock 有公平和非公平之分,因此ArrayBlockingQueue 相当于也有公平性和非公平性之分。
ArrayBlockingQueue 比一般的队列多了两个附加操作(阻塞式的插入和移除方法),这个依赖于Condition实现。
ArrayBlockingQueue 是一种逻辑上的环形队列。
ArrayBlockingQueue 在入队和出队上都使用了同一个重入锁,因此入队和出队是不能并发执行的。
相关文章推荐
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- java阻塞队列ArrayBlockingQueue源码分析
- Java 并发 --- 阻塞队列之LinkedTransferQueue源码分析
- Java多线程与并发应用-(10)-java阻塞队列实现ArrayBlockingQueue
- java 5并发中的阻塞队列ArrayBlockingQueue的使用以及案例实现
- 深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- jdk 源码分析(11)java ArrayBlockingQueue 缓存队列分析
- JAVA并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析(还没看,先马)
- Java阻塞队列ArrayBlockingQueue使用及原理分析
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- Java 并发 --- 阻塞队列之DelayQueue源码分析
- 【死磕Java并发】-----J.U.C之阻塞队列:ArrayBlockingQueue
- Java concurrent Framework并发容器之ArrayBlockingQueue(1.6)源码分析
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- Java并发之BlockingQueue 阻塞队列(ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、SynchronousQueue)
- 深入理解阻塞队列(二)——ArrayBlockingQueue源码分析
- [置顶] JAVA并发之BlockingQueue阻塞队列
- Java5 多线程(八)-- ArrayBlockingQueue阻塞队列
- java多线程系列(九)---ArrayBlockingQueue源码分析