您的位置:首页 > 编程语言 > Java开发

Java并发编程之ArrayBlockingQueue

2019-05-26 15:56 295 查看

我们今天来介绍一个新概念,阻塞队列。

阻塞队列

当队列中为空时,从队列中获取元素的操作将被阻塞,当队列满时,向队列中添加元素的操作将被阻塞。

ArrayBlockingQueue

ArrayBlockingQueue是一个由数组组成的有界队列。此队列按照先进先出的顺序进行排序。支持公平锁和非公平锁。

ArrayBlockingQueue的继承关系

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E&
4000
gt;, java.io.Serializable {}

我们看到,ArrayBlockingQueue实现了BlockingQueue接口,该接口表示阻塞型的队列

ArrayBlockingQueue的部分属性

// 存放实际元素的数组
final Object[] items;
// 取元素索引
int takeIndex;
// 获取元素索引
int putIndex;
// 队列中的项
int count;
// 可重入锁
final ReentrantLock lock;
// 等待获取条件
private final Condition notEmpty;
// 等待存放条件
private final Condition notFull;
// 迭代器
transient Itrs itrs = null;

我们需要注意,取元素和存元素有不同的索引

ArrayBlockingQueue的构造函数

无参构造函数
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

创建了一个固定容量和默认访问策略的ArrayBlockingQueue

ArrayBlockingQueue(int, boolean)
public ArrayBlockingQueue(int capacity, boolean fair) {
// 初始容量必须大于0
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
// 初始化等待条件
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}

创建了一个固定容量和指定访问策略的ArrayBlockingQueue

put操作

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 如果当前线程未被中断,则获取锁
lock.lockInterruptibly();
try {
while (count == items.length) // 判断元素是否已满
// 若满,则等待
notFull.await();
// 入队列
enqueue(e);
} finally {
lock.unlock();
}
}

put函数用于存放元素,在当前线程被中断时会抛出异常,并且当队列已经满时,会阻塞一直等待。我们观察到调用了enqueue来实行了入队列操作,下面我们来看看该函数的源码

private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) // 放入后存元素的索引等于数组长度(表示已满)
// 重置存索引为0
putIndex = 0;
// 元素数量加1
count++;
// 唤醒在notEmpty条件上等待的线程
notEmpty.signal();
}

enqueue会唤醒等待notEmpty条件的线程。

offer操作

public boolean offer(E e) {
// 检查元素不能为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) // 元素个数等于数组长度,则返回
return false;
else { // 添加进数组
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

用于存放元素

take操作

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 如果当前线程未被中断,则获取锁,中断会抛出异常
lock.lockInterruptibly();
try {
while (count == 0) // 元素数量为0,即Object数组为空
// 则等待notEmpty条件
notEmpty.await();
// 出队列
return dequeue();
} finally {
// 释放锁
lock.unlock();
}
}

与put操作对应,从阻塞队列中获取一个元素。我们观察到调用了dequeue来实行了出队列操作,下面我们来看看该函数的源码

private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// 该索引的值赋值为null
items[takeIndex] = null;
// 取值索引等于数组长度

1cca8
if (++takeIndex == items.length)
// 重新赋值取值索引
takeIndex = 0;
// 元素个数减1
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒在notFull条件上等待的线程
notFull.signal();
return x;
}

poll操作

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 若元素个数为0则返回null,否则,调用dequeue,出队列
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

poll操作与offer对应,用于获取元素。

clear操作

public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 保存元素个数
int k = count;
if (k > 0) { // 元素个数大于0
final int putIndex = this.putIndex;
int i = takeIndex;
do {
// 赋值为null
items[i] = null;
if (++i == items.length) // 重新赋值i
i = 0;
} while (i != putIndex);
// 重新赋值取元素索引
takeIndex = putIndex;
// 元素个数为0
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
for (; k > 0 && lock.hasWaiters(notFull); k--) // 若有等待notFull条件的线程,则逐一唤醒
notFull.signal();
}
} finally {
lock.unlock();
}
}

clear操作会清空阻塞队列,并且会释放所有等待notFull条件的线程(存放元素的线程)。

add操作

public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

remove操作

public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

参考:【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
阻塞队列和ArrayBlockingQueue源码解析(JDK1.8)

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: