您的位置:首页 > 产品设计 > UI/UE

04.JUC 集合 - ArrayBlockingQueue

2017-02-23 19:19 447 查看

基本概念

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。

默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。

内部构造

构造函数,ArrayBlockingQueue 内部维护了一个数组。因为它是有界队列,所以数组容量是固定不变的,不会进行扩容操作。并且采用了单锁+双条件队列来解决出入队操作的冲突问题。

// 内部数组
final Object[] items;

// 重入锁
final ReentrantLock lock;

// 条件,用于出队操作
private final Condition notEmpty;

// 条件,用于入队操作
private final Condition notFull;

public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0){
// 抛出异常...
}
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}


出入队位置,ArrayBlockingQueue 依靠出入队位置从而达到循环使用数组。

// 出队位置
int takeIndex;
// 入队位置
int putIndex;


初始化时,出入队位置都为 0 。



正常情况下,入队位置都是在出队位置前面。因为只有先入队,才能执行出队操作。



当入队位置到达数组末尾后,又从数组初始开始执行入队。此时入队位置在出队位置之前。



入队操作

add,成功返回 ture,失败抛出异常

public boolean add(E e) {
return super.add(e);
}

// BlockingQueue.add
public boolean add(E e) {
if (offer(e)){
return true;
}else{
// 抛出异常...
}
}


offer,成功返回 true,失败返回 false

public boolean offer(E e) {
// e 为空抛出异常
checkNotNull(e);

// 加锁
final ReentrantLock lock = this.lock;
lock.lock();

try {
// 判断队列的元素个数是否满了
if (count == items.length){
return false;
}else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}


put,失败则进入阻塞,知道成功。

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列满了,则进入 notFull 条件等待队列
while (count == items.length){
notFull.await();
}
insert(e);
} finally {
lock.unlock();
}
}


关键

private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;

// 唤醒出队时因为空队列而进入 notEmpty 等待队列的线程
notEmpty.signal();
}

final int inc(int i) {
// 关键 -> 循环增加,一旦到达末尾又重新回到 0
return (++i == items.length) ? 0 : i;
}


出队操作

remove,失败返回 false,成功返回 true。

public boolean remove(Object o) {
if (o == null){
return false;
}
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 遍历内部数组的所有元素,从 takeIndex 开始
for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
if (o.equals(items[i])) {
// 存在匹配元素,则删除
removeAt(i);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}

// 删除指定位置的元素
void removeAt(int i) {
final Object[] items = this.items;

// 判断是否在 takeIndex 上
if (i == takeIndex) {
items[takeIndex] = null;
takeIndex = inc(takeIndex);
} else {
// 关键 -> 若指定位置,不在 takeIndex 上,那么表示它一定在 takeIndex - PutIndex 之间。
// 需要将 [(i+1) - putIndex ] 位置的元素,前移一位,通过覆盖 i 位置的元素,来达到删除的效果。
for (;;) {
int nexti = inc(i);
if (nexti != putIndex) {
items[i] = items[nexti];
i = nexti;
} else {
items[i] = null;
putIndex = i;
break;
}
}
}
--count;
// 唤醒入队时因为满队列而进入 notFull 等待队列的线程
notFull.signal();
}


poll,成功返回 true,失败返回 false。

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}


take,成功返回被操作的值,失败则进入阻塞直至成功。

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0){
notEmpty.await();
}
return extract();
} finally {
lock.unlock();
}
}


关键

private E extract() {
final Object[] items = this.items;
E x = this.<E> cast(items[takeIndex]);
items[takeIndex] = null;

takeIndex = inc(takeIndex);
--count;

notFull.signal();
return x;
}

// 类型转换
@(JUC 集合)
static <E> E cast(Object item) {
return (E) item;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息