您的位置:首页 > 产品设计 > UI/UE

多线程(十八、阻塞队列-ArrayBlockingQueue)

2019-07-04 21:55 1651 查看

ArrayBlockingQueue简介

1、队列的容量一旦在构造时指定,后续不能改变;
2、插入元素时,在队尾进行;删除元素时,在队首进行;
3、队列满时,插入元素会阻塞线程;队列空时,删除元素也会阻塞线程;
4、支持公平/非公平策略,默认为非公平策略。

ArrayBlockingQueue构造

核心构造方法:

/**
* 指定队列初始容量和公平/非公平策略的构造器.
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();

this.items = new Object[capacity];
lock = new ReentrantLock(fair);     // 利用独占锁的策略
notEmpty = lock.newCondition();  //当队列空时,线程在该队列等待获取
notFull = lock.newCondition();  // 当队列满时,线程在该队列等待插入
}

成员变量

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

/**
* 内部数组
*/
final Object[] items;

/**
* 下一个待删除位置的索引: take, poll, peek, remove方法使用
*/
int takeIndex;

/**
* 下一个待插入位置的索引: put, offer, add方法使用
*/
int putIndex;

/**
* 队列中的元素个数
*/
int count;

/**
* 全局锁
*/
final ReentrantLock lock;

/**
* 非空条件队列:当队列空时,线程在该队列等待获取
*/
private final Condition notEmpty;

/**
* 非满条件队列:当队列满时,线程在该队列等待插入
*/
private final Condition notFull;
}

方法

ArrayBlockingQueue方法一共4个:put(E e)、offer(e, time, unit)和take()、poll(time, unit),我们先来看插入元素的方法。

插入数据-put

/**
* 在队尾插入指定元素,如果队列已满,则阻塞线程.
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();   // 加锁
try {
while (count == items.length)   // 队列已满。这里必须用while
notFull.await();             // 在notFull队列上等待
enqueue(e);                     // 队列未满, 直接入队
} finally {
lock.unlock();
}
}

入队方法:enqueue

/**
* 每次插入一个元素都会唤醒一个等待线程来处理
*/
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)     // 队列已满,则重置索引为0
putIndex = 0;
count++;                            // 元素个数+1
notEmpty.signal();                  // 唤醒一个notEmpty上的等待线程(可以来队列取元素了)
}

删除元素-take

/**
* 从队首删除一个元素, 如果队列为空, 则阻塞线程
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)      // 队列为空, 则线程在notEmpty条件队列等待
notEmpty.await();
return dequeue();       // 队列非空,则出队一个元素
} finally {
lock.unlock();
}
}

出队函数:dequeue

/**
*删除一个元素,则唤醒一个等待插入的线程
*/
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)    // 如果队列已空,重置获取索引
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();                   // 唤醒一个notFull上的等待线程(可以插入元素到队列了)
return x;
}

环形结构

1、初始化:

2、插入元素“9”

3、插入元素“2”、“10”、“25”、“93”

4、插入元素“90”

注意,此时再插入一个元素“90”,则putIndex变成6,等于队列容量6,由于是循环队列,所以会将tableIndex重置为0。

重置代码看这里:

5、出队元素“9”

6、出队元素“2”、“10”、“25”、“93”

7、出队元素“90”
注意,此时再出队一个元素“90”,则tabeIndex变成6,等于队列容量6,由于是循环队列,所以会将tableIndex重置为0

重置代码看这里

总结

1、ArrayBlockingQueue利用了ReentrantLock来保证线程的安全性,针对队列的修改都需要加全局锁。
2、ArrayBlockingQueue是有界的,且在初始时指定队列大小。
3、ArrayBlockingQueue的内部数组其实是一种环形结构。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: