您的位置:首页 > 产品设计 > UI/UE

ArrayBlockingQueue源码解读

2016-11-21 01:42 351 查看
ArrayBlockingQueue(数组阻塞队列)是jdk自带的阻塞队列的一个实现类。分析它之前,先说明一下,阻塞队列的特性:所谓阻塞队列,它与正常队列的区别就是,当队列为空时,获取队列元素的操作会被阻塞,直到其他线程往队列里插入新的元素;当队列满时,插入队列的操作会被阻塞,直到其他线程从队列里取出元素。

ArrayBlockingQueue实现了阻塞队列接口BlockingQueue,该接口定义的方法如下:

其中最主要的就是add(E)、offer(E)、put(E)、offer(E,long,TimeUnit)、take()、poll(long,TimeUnit)、remove(Object)这几个方法.

查看add方法说明:

/**

* Inserts the specified element into this queue if it is possible to do

* so immediately without violating capacity restrictions, returning

* <tt>true</tt> upon success and throwing an

* <tt>IllegalStateException</tt> if no space is currently available.

* When using a capacity-restricted queue, it is generally preferable to

* use {@link #offer(Object) offer}.

*

* @param e the element to add

* @return <tt>true</tt> (as specified by {@link Collection#add})

* @throws IllegalStateException if the element cannot be added at this

* time due to capacity restrictions

* @throws ClassCastException if the class of the specified element

* prevents it from being added to this queue

* @throws NullPointerException if the specified element is null

* @throws IllegalArgumentException if some property of the specified

* element prevents it from being added to this queue

*/

大概意思就是当队列容量够时,进行插入并返回true,当容量不够时,抛出IllegalStateException异常。

接口既然这么定义了,我们看实现类具体如何实现的。

ArrayBlockingQueue中实现的方法非常简单,就是如下一句话,调用了父类的add方法。

public boolean add(E e) {

return super.add(e);

}

我们仔细查看ArrayBlockingQueue,发现除了实现BlockingQueue接口外,还继承了AbstractQueue。AbstractQueue类是队列实现的一个抽象类,其中定义了一些模板方法供子类使用。我们看AbstractQueue的add方法:

public boolean add(E e) {

if (offer(e))

return true;

else

throw new IllegalStateException("Queue full");

}

结果恍然大悟,原来调用了offer(E)方法,当返回false时,抛出异常。再来跟踪offer方法,实际上是一个抽象方法,具体在ArrayBlockingQueue中实现,放到后边再说。

从这一段代码,我们也能感受到jdk自带的集合类里面那满满的“套路”。大家都说多一点真诚,少一点套路,实际上,套路用多了才是王道啊,说明除了真诚之外,更多了一些思考和总结。呵呵,此处说了一些废话,这里说的套路,就是所谓的模式,这里使用了模板模式,虽然这个地方用的比较简单,但是也能体会到模板模式的好处。在抽象类里面使用了模板方法后,其余实现类只需要实现真正插入元素的offer方法即可。另外,我个人还感觉这里使用了适配器模式,讲AbstractQueue中的方法适配成BlockingQueue的接口。本人刚刚开始研究模式,有说的不对的地方请大家指正。

言归正传,咱们仔细观察一下ArrayBlockingQueue类,首先查看定义的变量

其中E[] items就是主角,是真正存放元素的数组。

takeIndex、是执行下一次take、poll、remove方法时,对应元素在数组的索引。

putIndex、是执行下一次 put,、offer、 add方法时,对应元素在数组的索引。

count是当前队列元素的数量。

lock为对队列进行操作时的锁;

notEmpty、notFull分别为唤醒take操作阻塞的线程和唤醒put操作阻塞的线程;

这些变量具体作用,结合后边的分析进行介绍。

首先,看ArrayBlockingQueue构造函数

public ArrayBlockingQueue(int capacity, boolean fair) {

if (capacity <= 0)

throw new IllegalArgumentException();

this.items = (E[]) new Object[capacity];

lock = new ReentrantLock(fair);

notEmpty = lock.newCondition();

notFull = lock.newCondition();

}

capacity为阻塞队列容量、fair为锁类型,true为公平锁,保证被阻塞线程按照先后顺序依次被唤醒,false为非公平锁,不保证线程唤醒顺序。从性能上来讲,非公平锁性能要高于公平锁。因为公平锁增加了线程切换的频率。但是非公平锁可能导致有些线程一直处于饥饿状态,无法获得cpu执行时间。关于锁的问题,本次不再详细解释,大家可以去查资料。我们看构造函数做了哪些事情:

1、 初始化数组;

2、 初始化锁;

3、 初始化notEmpty 、notFull 两个条件,为以后唤醒阻塞线程服务;
至此,一个数组阻塞队列初始化完成,可以往里面插入元素了,我们先研究add,前面已经说过,add实际上调用的就是offer,只不过增加了抛出异常功能。那我们看offer具体到底做了什么。

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
首先加锁,保证操作的线程安全。如果已有元素数量等于容器容量,则返回false。否则,调用insert方法,插入元素。
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
关键看第二句,修改putIndex的值。
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
短短一句话,反应了作者对数组操作的精心设计。
大家知道,数据在内存中是一块连续的内存,正常我们的想法是,先插入的元素放到数组的第一个位置,后插入的往后增加。但是如果是这种情况的话,当第一个元素出队列时,我们还要将后续元素依次往前迭代设值,这样才能保证下次出队列的元素位于数组的第一个位置。但是这样的话会频繁操作数组,每出队列一个元素就要重新设置一次,极大的影响了操作的性能。作者就此想出了一个巧妙的方法,每次出入队列时,不调整元素的位置,而是设置队首的索引和队尾的索引。putindex就是队尾的索引,下次插入元素时,直接插入到该位置,并将putIndex往后设置。takeIndex是队首的索引,下次出队列时,直接删除该位置,并将takeIndex往后设置。当takeIndex和putIndex走到数组最后y
一个位置时候,将其从头开始,这样就形成了一个循环的环。

理解了上述作者的意图,大家再看就没有什么难度了。insert元素时,循环往后设置putIndex的值。

insert方法最后调用notEmpty.signal()方法,唤醒获取元素被阻塞的线程。

offer分析完毕,再看put

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

final E[] items = this.items;

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

try {

while (count == items.length)

notFull.await();

} catch (InterruptedException ie) {

notFull.signal(); // propagate to non-interrupted thread

throw ie;

}

insert(e);

} finally {

lock.unlock();

}

}

可以看到,加锁后,当队列已满事,调用notFull.await();阻塞当前线程。未满时,调用insert插入元素。

插入方法分析完毕,实际上,删除方法和插入方法类似,不再一一分析。
如有分析不对的地方,欢迎大家指正。谢谢!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ArrayBlockingQueue java