您的位置:首页 > 产品设计 > UI/UE

JUC队列-ArrayBlockingQueue(一)

2018-01-14 20:15 369 查看

ArrayBlockingQueue介绍

ArrayBlockingAQueue是用数组实现的线程安全的有界的阻塞队列。

线程安全是指通过“互斥锁”保护竞争资源,实现了对线程对竞争资源的互斥访问,有界是指ArrayB咯KingQueue对应的数组是有界限的,阻塞队列是指当多个线程访问竞争资源时,当竞争资源已经被某个线程获取时,其他要获取该线程的线程需要等待。

注意:ArrayBlockingQueue不同于LinkedBlockingQueue,ArrayBlockingQueue是数组实现的,并且是有界限的;而LinkedBlockingQueue是链表实现的,是无界限的。

ArrayBlokingQueue的uml图:



说明:

ArrayBlockingQueue继承于AbstractQueue,并且它实现了BlockingQueue接口。

ArrayBlockingQueue内部是通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是创建ArrayBlockingQueue时指定的。

ArrayBlockingQueue与ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象(lock)。ReentrantLock是可重入的互斥锁,ArrayBlockingQueue就是根据该互斥锁实现“多线程对竞争资源的互斥访问”。而且,ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。

ArrayBlockingQueue与Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象(notEmpty和notFull)。而且,Condition又依赖于ArrayBlockingQueue而存在,通过Condition可以实现对ArrayBlockingQueue的更精确的访问。

ArrayBlockingQueue源码分析

构造方法

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}


初始化数组,独占锁和两个“条件”,非空条件和满条件。

添加元素

public void put(E e) throws InterruptedException {
checkNotNull(e);
//获取队列的独占锁
final ReentrantLock lock = this.lock;
//获取锁,如果锁处于中断状态,则抛出InterruptedException异常
lock.lockInterruptibly();
try {
//如果队列已满,则一直等待
while (count == items.length)
notFull.await();
//入队
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}


说明:put(E e)的作用是将e插入阻塞队列的尾部。如果队列已满,则等待;否则,插入元素。

在了解入队enqueue操作时,我们先了解下面几个成员的含义:

// 队列中的元素个数
int takeIndex;
// 下一个被取出元素的索引
int putIndex;
// 下一个被添加元素的索引
int count;


enqueue()的源码如下:

private void enqueue(E x) {

final Object[] items = this.items;
//添加到队列中
items[putIndex] = x;
//设置下一个被取出元素的索引
if (++putIndex == items.length)
putIndex = 0;
count++;
//唤醒notEmpty上的等待线程
notEmpty.signal();
}


取出元素

取出元素的过程其实跟添加元素的过程,这里就直接贴出代码:

public E take() throws InterruptedException {
// 获取“队列的独占锁”
final ReentrantLock lock = this.lock;
// 获取“锁”,若当前线程是中断状态,则抛出InterruptedException异常
lock.lockInterruptibly();
try {
// 若“队列为空”,则一直等待。
while (count == 0)
notEmpty.await();
// 取出元素
return extract();
} finally {
// 释放“锁”
lock.unlock();
}
}


private E extract() {
final Object[] items = this.items;
// 强制将元素转换为“泛型E”
E x = this.<E>cast(items[takeIndex]);
// 将第takeIndex元素设为null,即删除。同时,帮助GC回收。
items[takeIndex] = null;
// 设置“下一个被取出元素的索引”
takeIndex = inc(takeIndex);
// 将“队列中元素数量”-1
--count;
// 唤醒notFull上的等待线程。
notFull.signal();
return x;
}


删除元素

这里我拿remove(Object o) 举例

public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
//索引到了末尾,重置0
if (++i == items.length)
i = 0;
//直到 i = takeIndex增长到putIndex
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}


说明:remove(Object)实际上就是对数组的进行遍历对比,只不过这个队列数组有点特殊,它是环状的数组,也就是可重用的数组,如果找到该元素,调用removeAt()

void removeAt(final int removeIndex) {
final Object[] items = this.items;
//如果remove的索引时队列中的第一个元素,可以直接出队。
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();

//如果remove的不是第一个元素,那么直接从reomve
//的那个索引开始,后面的元素全部前移一位
} else {
// an "interior" remove

// slide over all others up through putIndex.
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息