您的位置:首页 > 产品设计 > UI/UE

基于数组的有界阻塞队列ArrayBlockingQueue源码分析

2016-12-22 17:46 561 查看

一:功能介绍
         基于数组的有界阻塞队列,基于FIFO的存储模式,支持公平非公平锁。
二:源码分析

//数组
final Object[] items;
//出队索引
int takeIndex;
//入队索引
int putIndex;
//队列大小
int count;
//可重入锁
final ReentrantLock lock;
//等待通知条件
private final Condition notEmpty;
//等待通知条件
private final Condition notFull;

  构造函数

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化数组容量
this.items = new Object[capacity];
//内容采用可重入锁ReentrantLock实现,支持公平非公平选择
lock = new ReentrantLock(fair);
//阻塞队列,等待条件
notEmpty = lock.newCondition();
//阻塞队列,等待条件
notFull =  lock.newCondition();
}

   入队操作

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//可中断获取锁,如果出现了interrupted,不用一直阻塞
lock.lockInterruptibly();
try {
//如果队列已满
while (count == items.length)
//入队线程阻塞
notFull.await();
//插入数据
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
//将新的数据赋值在数组的某一个索引处
items[putIndex] = x;
//重新赋值putIndex,设置下一个被取出元素的索引
putIndex = inc(putIndex);
//队列大小+1
++count;
//唤醒take线程
notEmpty.signal();
}
final int inc(int i) {
//如果队列满了,重新初始化为0
return (++i == items.length) ? 0 : i;
}

    出队操作

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//同上,获取中断锁
lock.lockInterruptibly();
try {
//队列没有值,阻塞
while (count == 0)
notEmpty.await();
//返回被取走的数据
return extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
//获取takeIndex处的元素
E x = this.<E>cast(items[takeIndex]);
//置空takeIndex处的元素,引用不存在,便于GC,释放内存
items[takeIndex] = null;
//重新赋值takeIndex,设置下一个被取出的元素
takeIndex = inc(takeIndex);
//队列大小-1
--count;
//唤醒put线程
notFull.signal();
return x;
}

  移除数据

public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//从takeIndex处开始计算,每次i加1,最大为队列最大容量count
for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
//如果移除元素在数组某个下标找到
if (o.equals(items[i])) {
removeAt(i);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(int i) {
final Object[] items = this.items;
//如果准备移除的索引和下一个被取出的元素索引一样,直接移除
if (i == takeIndex) {
//赋值null,便于GC
items[takeIndex] = null;
//重新设置下一个被取出元素的索引
takeIndex = inc(takeIndex);
//如果需要删除的元素索引不是当前被取出的索引
} else {
//一直循环,直到删除为止
for (;;) {
//假设队列容量是4,目前存了3个元素,即takeIndex=0,putIndex=3,目前我打算删除数组下标为1的元素
// nexti第一次为2
int nexti = inc(i);
if (nexti != putIndex) {
//相当于将队列往前移
items[i] = items[nexti];
//相当于i+1
i = nexti;
//待删除的索引与待put的索引相等,比如putIndex=2,i=1,inc(i) = 2
} else {
//索引i处置null,偏于GC
items[i] = null;
//重新赋值下一个即将放入元素的索引
putIndex = i;
break;
}
}
}
//队列大小-1
--count;
//唤醒put线程,公平的话按FIFO顺序,非公平的话可以抢占
notFull.signal();
}

    遍历队列

public Iterator<E> iterator() {
return new Itr();
}

private class Itr implements Iterator<E> {
//队列里面还剩的元素个数
private int remaining;
//下一次调用next()返回的索引
private int nextIndex;
//下一次调用next()返回的元素
private E nextItem;
//上一次调用next()返回的元素
private E lastItem;
//上一次调用next()返回的索引
private int lastRet;

Itr() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
lastRet = -1;
//只有队列里面还有元素
if ((remaining = count) > 0)
//获取takeIndex处的元素
nextItem = itemAt(nextIndex = takeIndex);
} finally {
lock.unlock();
}
}

public boolean hasNext() {
return remaining > 0;
}

public E next() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
//如果队列没有值
if (remaining <= 0)
throw new NoSuchElementException();
lastRet = nextIndex;
//获取下一次获取索引处的元素
E x = itemAt(nextIndex);  // check for fresher value
if (x == null) {
x = nextItem;         // we are forced to report old value
lastItem = null;      // but ensure remove fails
}
else
//将刚获取的元素当做上一次获取的元素
lastItem = x;
//当下一次获取的元素不存在的时候
while (--remaining > 0 && // skip over nulls
(nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
;
return x;
} finally {
lock.unlock();
}
}

public void remove() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
int i = lastRet;
if (i == -1)
throw new IllegalStateException();
lastRet = -1;
E x = lastItem;
lastItem = null;
// only remove if item still at index
if (x != null && x == items[i]) {
boolean removingHead = (i == takeIndex);
removeAt(i);
if (!removingHead)
nextIndex = dec(nextIndex);
}
} finally {
lock.unlock();
}
}
}
阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: