ArrayBlockingQueue源码解读
2016-11-21 01:42
351 查看
ArrayBlockingQueue(数组阻塞队列)是jdk自带的阻塞队列的一个实现类。分析它之前,先说明一下,阻塞队列的特性:所谓阻塞队列,它与正常队列的区别就是,当队列为空时,获取队列元素的操作会被阻塞,直到其他线程往队列里插入新的元素;当队列满时,插入队列的操作会被阻塞,直到其他线程从队列里取出元素。
ArrayBlockingQueue实现了阻塞队列接口BlockingQueue,该接口定义的方法如下:
其中最主要的就是add(E)、offer(E)、put(E)、offer(E,long,TimeUnit)、take()、poll(long,TimeUnit)、remove(Object)这几个方法.
查看add方法说明:
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* <tt>true</tt> upon success and throwing an
* <tt>IllegalStateException</tt> if no space is currently available.
* When using a capacity-restricted queue, it is generally preferable to
* use {@link #offer(Object) offer}.
*
* @param e the element to add
* @return <tt>true</tt> (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
大概意思就是当队列容量够时,进行插入并返回true,当容量不够时,抛出IllegalStateException异常。
接口既然这么定义了,我们看实现类具体如何实现的。
ArrayBlockingQueue中实现的方法非常简单,就是如下一句话,调用了父类的add方法。
public boolean add(E e) {
return super.add(e);
}
我们仔细查看ArrayBlockingQueue,发现除了实现BlockingQueue接口外,还继承了AbstractQueue。AbstractQueue类是队列实现的一个抽象类,其中定义了一些模板方法供子类使用。我们看AbstractQueue的add方法:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
结果恍然大悟,原来调用了offer(E)方法,当返回false时,抛出异常。再来跟踪offer方法,实际上是一个抽象方法,具体在ArrayBlockingQueue中实现,放到后边再说。
从这一段代码,我们也能感受到jdk自带的集合类里面那满满的“套路”。大家都说多一点真诚,少一点套路,实际上,套路用多了才是王道啊,说明除了真诚之外,更多了一些思考和总结。呵呵,此处说了一些废话,这里说的套路,就是所谓的模式,这里使用了模板模式,虽然这个地方用的比较简单,但是也能体会到模板模式的好处。在抽象类里面使用了模板方法后,其余实现类只需要实现真正插入元素的offer方法即可。另外,我个人还感觉这里使用了适配器模式,讲AbstractQueue中的方法适配成BlockingQueue的接口。本人刚刚开始研究模式,有说的不对的地方请大家指正。
言归正传,咱们仔细观察一下ArrayBlockingQueue类,首先查看定义的变量
其中E[] items就是主角,是真正存放元素的数组。
takeIndex、是执行下一次take、poll、remove方法时,对应元素在数组的索引。
putIndex、是执行下一次 put,、offer、 add方法时,对应元素在数组的索引。
count是当前队列元素的数量。
lock为对队列进行操作时的锁;
notEmpty、notFull分别为唤醒take操作阻塞的线程和唤醒put操作阻塞的线程;
这些变量具体作用,结合后边的分析进行介绍。
首先,看ArrayBlockingQueue构造函数
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
capacity为阻塞队列容量、fair为锁类型,true为公平锁,保证被阻塞线程按照先后顺序依次被唤醒,false为非公平锁,不保证线程唤醒顺序。从性能上来讲,非公平锁性能要高于公平锁。因为公平锁增加了线程切换的频率。但是非公平锁可能导致有些线程一直处于饥饿状态,无法获得cpu执行时间。关于锁的问题,本次不再详细解释,大家可以去查资料。我们看构造函数做了哪些事情:
1、 初始化数组;
2、 初始化锁;
3、 初始化notEmpty 、notFull 两个条件,为以后唤醒阻塞线程服务;
至此,一个数组阻塞队列初始化完成,可以往里面插入元素了,我们先研究add,前面已经说过,add实际上调用的就是offer,只不过增加了抛出异常功能。那我们看offer具体到底做了什么。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
首先加锁,保证操作的线程安全。如果已有元素数量等于容器容量,则返回false。否则,调用insert方法,插入元素。
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
关键看第二句,修改putIndex的值。
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
短短一句话,反应了作者对数组操作的精心设计。
大家知道,数据在内存中是一块连续的内存,正常我们的想法是,先插入的元素放到数组的第一个位置,后插入的往后增加。但是如果是这种情况的话,当第一个元素出队列时,我们还要将后续元素依次往前迭代设值,这样才能保证下次出队列的元素位于数组的第一个位置。但是这样的话会频繁操作数组,每出队列一个元素就要重新设置一次,极大的影响了操作的性能。作者就此想出了一个巧妙的方法,每次出入队列时,不调整元素的位置,而是设置队首的索引和队尾的索引。putindex就是队尾的索引,下次插入元素时,直接插入到该位置,并将putIndex往后设置。takeIndex是队首的索引,下次出队列时,直接删除该位置,并将takeIndex往后设置。当takeIndex和putIndex走到数组最后y
一个位置时候,将其从头开始,这样就形成了一个循环的环。
理解了上述作者的意图,大家再看就没有什么难度了。insert元素时,循环往后设置putIndex的值。
insert方法最后调用notEmpty.signal()方法,唤醒获取元素被阻塞的线程。
offer分析完毕,再看put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
可以看到,加锁后,当队列已满事,调用notFull.await();阻塞当前线程。未满时,调用insert插入元素。
插入方法分析完毕,实际上,删除方法和插入方法类似,不再一一分析。
如有分析不对的地方,欢迎大家指正。谢谢!
ArrayBlockingQueue实现了阻塞队列接口BlockingQueue,该接口定义的方法如下:
其中最主要的就是add(E)、offer(E)、put(E)、offer(E,long,TimeUnit)、take()、poll(long,TimeUnit)、remove(Object)这几个方法.
查看add方法说明:
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* <tt>true</tt> upon success and throwing an
* <tt>IllegalStateException</tt> if no space is currently available.
* When using a capacity-restricted queue, it is generally preferable to
* use {@link #offer(Object) offer}.
*
* @param e the element to add
* @return <tt>true</tt> (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
大概意思就是当队列容量够时,进行插入并返回true,当容量不够时,抛出IllegalStateException异常。
接口既然这么定义了,我们看实现类具体如何实现的。
ArrayBlockingQueue中实现的方法非常简单,就是如下一句话,调用了父类的add方法。
public boolean add(E e) {
return super.add(e);
}
我们仔细查看ArrayBlockingQueue,发现除了实现BlockingQueue接口外,还继承了AbstractQueue。AbstractQueue类是队列实现的一个抽象类,其中定义了一些模板方法供子类使用。我们看AbstractQueue的add方法:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
结果恍然大悟,原来调用了offer(E)方法,当返回false时,抛出异常。再来跟踪offer方法,实际上是一个抽象方法,具体在ArrayBlockingQueue中实现,放到后边再说。
从这一段代码,我们也能感受到jdk自带的集合类里面那满满的“套路”。大家都说多一点真诚,少一点套路,实际上,套路用多了才是王道啊,说明除了真诚之外,更多了一些思考和总结。呵呵,此处说了一些废话,这里说的套路,就是所谓的模式,这里使用了模板模式,虽然这个地方用的比较简单,但是也能体会到模板模式的好处。在抽象类里面使用了模板方法后,其余实现类只需要实现真正插入元素的offer方法即可。另外,我个人还感觉这里使用了适配器模式,讲AbstractQueue中的方法适配成BlockingQueue的接口。本人刚刚开始研究模式,有说的不对的地方请大家指正。
言归正传,咱们仔细观察一下ArrayBlockingQueue类,首先查看定义的变量
其中E[] items就是主角,是真正存放元素的数组。
takeIndex、是执行下一次take、poll、remove方法时,对应元素在数组的索引。
putIndex、是执行下一次 put,、offer、 add方法时,对应元素在数组的索引。
count是当前队列元素的数量。
lock为对队列进行操作时的锁;
notEmpty、notFull分别为唤醒take操作阻塞的线程和唤醒put操作阻塞的线程;
这些变量具体作用,结合后边的分析进行介绍。
首先,看ArrayBlockingQueue构造函数
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
capacity为阻塞队列容量、fair为锁类型,true为公平锁,保证被阻塞线程按照先后顺序依次被唤醒,false为非公平锁,不保证线程唤醒顺序。从性能上来讲,非公平锁性能要高于公平锁。因为公平锁增加了线程切换的频率。但是非公平锁可能导致有些线程一直处于饥饿状态,无法获得cpu执行时间。关于锁的问题,本次不再详细解释,大家可以去查资料。我们看构造函数做了哪些事情:
1、 初始化数组;
2、 初始化锁;
3、 初始化notEmpty 、notFull 两个条件,为以后唤醒阻塞线程服务;
至此,一个数组阻塞队列初始化完成,可以往里面插入元素了,我们先研究add,前面已经说过,add实际上调用的就是offer,只不过增加了抛出异常功能。那我们看offer具体到底做了什么。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
首先加锁,保证操作的线程安全。如果已有元素数量等于容器容量,则返回false。否则,调用insert方法,插入元素。
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
关键看第二句,修改putIndex的值。
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
短短一句话,反应了作者对数组操作的精心设计。
大家知道,数据在内存中是一块连续的内存,正常我们的想法是,先插入的元素放到数组的第一个位置,后插入的往后增加。但是如果是这种情况的话,当第一个元素出队列时,我们还要将后续元素依次往前迭代设值,这样才能保证下次出队列的元素位于数组的第一个位置。但是这样的话会频繁操作数组,每出队列一个元素就要重新设置一次,极大的影响了操作的性能。作者就此想出了一个巧妙的方法,每次出入队列时,不调整元素的位置,而是设置队首的索引和队尾的索引。putindex就是队尾的索引,下次插入元素时,直接插入到该位置,并将putIndex往后设置。takeIndex是队首的索引,下次出队列时,直接删除该位置,并将takeIndex往后设置。当takeIndex和putIndex走到数组最后y
一个位置时候,将其从头开始,这样就形成了一个循环的环。
理解了上述作者的意图,大家再看就没有什么难度了。insert元素时,循环往后设置putIndex的值。
insert方法最后调用notEmpty.signal()方法,唤醒获取元素被阻塞的线程。
offer分析完毕,再看put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
可以看到,加锁后,当队列已满事,调用notFull.await();阻塞当前线程。未满时,调用insert插入元素。
插入方法分析完毕,实际上,删除方法和插入方法类似,不再一一分析。
如有分析不对的地方,欢迎大家指正。谢谢!
相关文章推荐
- ArrayBlockingQueue源码解读
- ArrayBlockingQueue源码解析
- ArrayBlockingQueue源码分析
- 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
- ArrayBlockingQueue源码分析
- ArrayBlockingQueue源码分析
- ArrayBlockingQueue源码剖析
- ArrayBlockingQueue源码中为什么方法要用局部变量引用类变量
- ArrayBlockingQueue源码分析
- 源码解析关于java阻塞容器:ArrayBlockingQueue,LinkedBlockingQueue等
- Java ArrayBlockingQueue 源码
- ArrayBlockingQueue源码解析
- Java concurrent Framework并发容器之ArrayBlockingQueue(1.6)源码分析
- 《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue
- ArrayBlockingQueue源码分析
- [源码]Condition的原理,简单案例(ArrayBlockingQueue),复杂案例(LinkedBlockingQueue).
- Java集合源码学习(16)_BlockingQueue接口的实现ArrayBlockingQueue
- Java集合, ArrayBlockingQueue源码解析(常用于并发编程)
- java多线程系列(九)---ArrayBlockingQueue源码分析
- ArrayBlockingQueue源码解析