JDK 1.8 ArrayBlockingQueue 源码阅读(一)插入
2017-02-26 22:58
585 查看
在工作环境中,我们通常会使用ArrayBlockingQueue 作为任务管理队列,并且之前在Qunar开发的时候,也用过ArrayBlockingQueue作为异步队列,实现了一个异步通知的程序。最近有点时间,阅读一下这个工具。
我们知道ArrayBlockingQueue是一个FIFO的有界阻塞式对列,每次取对列的元素的时候,取的是队首的元素,而每次往这个对列的队尾放元素。
先看一下这个class 包含有的属性
items 这个属性是用来保存队列里面的元素的,因为这个队列叫做Array,所以元素用数组保存。
takeIndex 这个属性是标明每次取出来的元素位置,这个可以理解为队首。
putIndex 这个属性标明每次放的位置,可以理解成队尾。
count 这个属性表示队列中元素的数量,这样保存有个好处,就不用每次都去遍历,数出元素个数。
lock 这个是锁,用来防止并发操作。
notEmpty 取出元素的等待条件。
notFull 往队列中放元素的等待条件。
itrs 因为这个queue实现了Iterable 接口,该属性保存了迭代器的状态。
2. 方法源码阅读
属性介绍完了,我们来看一看这个类包含的方法,以及方法的具体实现。
这个类提供了三个构造函数,我这里只列出了两个,因为有一个就是调用了我列的第二个。
首先看参数列表为int capacity, boolean fair
这个构造函数只是初始化了数组,两个等待条件,还有用来同步的锁,这个锁可以选择是否是公平锁还是非公平锁。
再看另一个构造函数可以传递一个Collection的构造函数。
该构造函数可以以一个现有的实现了Collection接口的元素为基础创建队列,不过,这里使用了ReentrantLock来锁住代码块,是不是很奇怪?注释里面说,这个锁只是用来保证可见性,并不是用来互斥的。并且,该方法catch住了ArrayIndexOutOfBoundsException 异常,这是因为,c是由外部传进来的,构造函数并不能保证c不被其他线程修改,所以有可能会在遍历的过程中抛ArrayIndexOutOfBoundsException(具体可以参考《java 并发编程实战》)
往下就是我们平时最关心的几个方法了
add(E e) 当队列不满,调用offer方法,当队列已满,抛出IllegalStateException 异常
offer(E e) 当队列不满,将元素放进队列,并返回成功,当队列满了,立刻返回false
put(E e) 阻塞式插入,如果队列元素满,会阻塞到队列有空间
offer(E e, long timeout, TimeUnit unit) 等待timeout时间的offer
这些方法都是往队列里面放元素的操作,但是它们有一些细微的不同,我们可以来看一下源码
先看offer操作,该操作的作用是,如果队列满了,会立刻返回false,如果队列没满,将元素插入队列,此方法调用了checkNotNull方法,在传递进来的元素为空会抛出NullPointerException异常。
这个方法使用了对象的锁,将队列长度的校验和插入操作锁起来,避免了并发问题。
再看add操作
从add方法代码可以看出,这个方法会先去调用offer ,如果返回false ,直接抛出异常。
下面接着看put方法,这个是我认为BlockingQueue提供的核心方法了,即阻塞的插入队列,之前和我们组的一个应届生讨论,他觉得这个阻塞应该是自旋式的阻塞,但是我说看一下代码,发现其实用的是Condition类的await,这个方法最后使用了LockSupport.unpark原语进行操作(有空可以研究一下Condition的代码)
最后再看带有等待时间的offer,该方法调用了Condition的awaitNanos ,相当于await一定的时间。
以上几个往队列里面插入元素的方法,最后都会调用到,这个方法很简单,就是在putIndex位置插入元素,但是最后会使用notEmpty的signal,因为插入之后,队列不为空,需要通知阻塞在take等方法的线程
我们知道ArrayBlockingQueue是一个FIFO的有界阻塞式对列,每次取对列的元素的时候,取的是队首的元素,而每次往这个对列的队尾放元素。
先看一下这个class 包含有的属性
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ transient Itrs itrs = null;
items 这个属性是用来保存队列里面的元素的,因为这个队列叫做Array,所以元素用数组保存。
takeIndex 这个属性是标明每次取出来的元素位置,这个可以理解为队首。
putIndex 这个属性标明每次放的位置,可以理解成队尾。
count 这个属性表示队列中元素的数量,这样保存有个好处,就不用每次都去遍历,数出元素个数。
lock 这个是锁,用来防止并发操作。
notEmpty 取出元素的等待条件。
notFull 往队列中放元素的等待条件。
itrs 因为这个queue实现了Iterable 接口,该属性保存了迭代器的状态。
2. 方法源码阅读
属性介绍完了,我们来看一看这个类包含的方法,以及方法的具体实现。
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity, the specified access policy and initially containing the * elements of the given collection, * added in traversal order of the collection's iterator. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @param c the collection of elements to initially contain * @throws IllegalArgumentException if {@code capacity} is less than * {@code c.size()}, or less than 1. * @throws NullPointerException if the specified collection or any * of its elements are null */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
这个类提供了三个构造函数,我这里只列出了两个,因为有一个就是调用了我列的第二个。
首先看参数列表为int capacity, boolean fair
这个构造函数只是初始化了数组,两个等待条件,还有用来同步的锁,这个锁可以选择是否是公平锁还是非公平锁。
再看另一个构造函数可以传递一个Collection的构造函数。
该构造函数可以以一个现有的实现了Collection接口的元素为基础创建队列,不过,这里使用了ReentrantLock来锁住代码块,是不是很奇怪?注释里面说,这个锁只是用来保证可见性,并不是用来互斥的。并且,该方法catch住了ArrayIndexOutOfBoundsException 异常,这是因为,c是由外部传进来的,构造函数并不能保证c不被其他线程修改,所以有可能会在遍历的过程中抛ArrayIndexOutOfBoundsException(具体可以参考《java 并发编程实战》)
往下就是我们平时最关心的几个方法了
add(E e) 当队列不满,调用offer方法,当队列已满,抛出IllegalStateException 异常
offer(E e) 当队列不满,将元素放进队列,并返回成功,当队列满了,立刻返回false
put(E e) 阻塞式插入,如果队列元素满,会阻塞到队列有空间
offer(E e, long timeout, TimeUnit unit) 等待timeout时间的offer
这些方法都是往队列里面放元素的操作,但是它们有一些细微的不同,我们可以来看一下源码
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
先看offer操作,该操作的作用是,如果队列满了,会立刻返回false,如果队列没满,将元素插入队列,此方法调用了checkNotNull方法,在传递进来的元素为空会抛出NullPointerException异常。
这个方法使用了对象的锁,将队列长度的校验和插入操作锁起来,避免了并发问题。
再看add操作
public boolean add(E e) { return super.add(e); } /**将超类的代码贴在下面*/ public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
从add方法代码可以看出,这个方法会先去调用offer ,如果返回false ,直接抛出异常。
下面接着看put方法,这个是我认为BlockingQueue提供的核心方法了,即阻塞的插入队列,之前和我们组的一个应届生讨论,他觉得这个阻塞应该是自旋式的阻塞,但是我说看一下代码,发现其实用的是Condition类的await,这个方法最后使用了LockSupport.unpark原语进行操作(有空可以研究一下Condition的代码)
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
最后再看带有等待时间的offer,该方法调用了Condition的awaitNanos ,相当于await一定的时间。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
以上几个往队列里面插入元素的方法,最后都会调用到,这个方法很简单,就是在putIndex位置插入元素,但是最后会使用notEmpty的signal,因为插入之后,队列不为空,需要通知阻塞在take等方法的线程
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
相关文章推荐
- JDK 1.8 ArrayBlockingQueue 源码阅读(二)获取
- JDK1.8 ArrayBlockingQueue源码分析
- ArrayBlockingQueue源码分析(基于JDK1.8)
- 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- java源码阅读之ArrayBlockingQueue
- java1.7集合源码阅读:ArrayBlockingQueue
- ArrayBlockingQueue源码阅读
- ArrayBlockingQueue源码阅读与理解
- 阅读ArrayBlockingQueue源码了解如何利用锁实现BlockingQueue
- JDK 源码解析 —— ArrayBlockingQueue
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- Jdk1.6 JUC源码解析(12)-ArrayBlockingQueue
- ArrayBlockingQueue源码阅读心得
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- 《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- ArrayBlockingQueue和LinkedBlockingQueue源码分析(jdk1.8)
- ArrayBlockingQueue 源码阅读与分析
- jdk 源码分析(11)java ArrayBlockingQueue 缓存队列分析