您的位置:首页 > 产品设计 > UI/UE

Java并发学习(二十二)-ArrayBlockingQueue分析

2018-01-07 10:06 609 查看
这两天花了几个小时来看ArrayBlockingQueue,阻塞队列。其实它的实现思想是比较简单的,主要是利用ReentrantLock和Condition来实现。首先理解什么是阻塞队列:

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

在队列为空时,获取元素的线程会等待队列变为非空。

当队列满时,存储元素的线程会等待队列可用。

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

也就是说,阻塞队列,这里以ArrayBlockQueue来说明,用生产者消费者模型来理解再好不过了。当队列里有东西,你可以进行取操作,没有东西时候,你就需要阻塞起来,等有东西再取。

What is ArrayBlockingQueue

首先从名字里面可以看出来,ArrayBlockingQueue里面主要的数据结构就是一个数组,使用ReentrantLock对其进行加锁,使用Condition实现存取等待。

接下来看里面定义:

/**
* FIFO特性。
* head元素是队列里面存在最久的元素。
* 非空
* 一旦创建,大小就不能改变。
* 如果队列满了,再想入対就会阻塞
* 支持公平锁策略。默认是非公平的。
*/
public class ArrayBlockingQueueAnlysis<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

final Object[] items;     //用来存items

int takeIndex;     //出队,获取的下一个index。

int putIndex;         //add或者put的下一个index

int count;              //元素的个数

final ReentrantLock lock;      //主要的一把锁

private final Condition notEmpty;     //空的condition

private final Condition notFull;     //满的condition

transient Itrs itrs = null;   //用于以链表方式存储所有已经创建的iterator。
...
}


上述代码中,大体都有注释,我觉得比较值得思考的有普通的两个index,两个Condition,当然还有一个

itrs这个变量,感觉是个比较新的思想,下文会聊。

add操作

由add操作:

public boolean add(E e) {
return super.add(e);
}


最终父类的add会调用子类具体实现的offer方法:

public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();           //加锁。
try {
if (count == items.length)
return false;
else {
enqueue(e);              //入队。
return true;
}
} finally {
lock.unlock();
}
}


在offer方法里面,会用lock进行加锁,也就是一次只能一个线程对queue进行操作,最终,会调用enqueue方法:

private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}


插入的就是简单的利用putIndex,在数组里面新增加一个元素,然后调用
notEmpty.signal();


跟所有在等待取数的线程说,喂,有东西了,来拿吧!

从上文add相关代码来看,还是比较好理解的,存储结构是数组,逻辑结构是队列,利用ReentrantLock进行加锁。

使用Condition实现等待/通知模式

put操作

对于add,还有一个类似的put操作:

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();          //如果满了,调用notFull这个condition加入队列睡眠等待。
enqueue(e);
} finally {
lock.unlock();
}
}


实现逻辑基本一致,注意一旦队列满了,需要调用
notFull.await()
等待。

take操作

现在看看出队的take操作:

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}


同样如果count==0,则需要等待。接下来看dequeue方法:

private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;     //当前位置置null,有利于回收。
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();       //把itrs里面也出队一个。
notFull.signal();
return x;
}


同样是针对数组的简单出队,从队尾出。会调用一次
itrs.elementDequeued();
下文提到。

其他方法

里面还有一些其他方法例如:

remainingCapacity() : 返回数组的甚于容量

remove(Object o) :删除某一个特定值为o的元素

size() :返回该元素的数量

spliterator() :返回此元素的分割迭代器



基本都是针对数组的操作,利用ReentrantLock和Condition进行控制。

下面主要看它的迭代器。

Itr

感觉ArrayBlockingQueue里面的迭代器非常有特色,不同于普通的集合类里面那么简单,从阅读类的源码来看,它是线程安全的,不会抛出
ConcurrentModificationException
。另一方面,ArrayBlockingQueue里面数组下标是循环利用的,可以理解为是条循环队列。

所以,所有的迭代器共享数据,队列改变会影响所有的迭代器。为了保证正确,增加了许多复杂的操作,但是由于循环数组和一些内部移除会导致迭代器丢失它们的位置,或显示一些它们不应该显示的元素。

比如,迭代器在创建的时候,其位置已经确定,但是队列可能在不断的出入队列,这样迭代器会受到严重影响,可能造成队列实际上入出循环了数组一圈,而迭代器记录的是上一圈的情况,只有下标,这样遍历就会造成很大的问题。

为了避免这个情况,同时也为了保证操作的正确性,当队列有一个或多个迭代器的时候,其通过以下手段保持状态:

跟踪循环的次数。即 takeIndex为0的次数。

每当删除一个内部元素时,通过回调通知所有迭代器(因此其他元素也可以移动)。

下面就是它的主要字段:

cursor:主要指向下一个元素

nextItem:指向下一个元素

nextIndex:nextItem的index

lastItem:最后一个元素

lastRet:最后一个元素的索引

prevTakeIndex:takeIndex的前一个位置

prevCycles:itrs监控前一个的循环数量cycles的值

NONE = -1:none模式,代表节点不存在或者没有

REMOVED = -2:说明当前节点被其他线程调用remove模式删除了

DETACHED = -3:说明处于detached模式

接下来看它的构造方法:

Itr() {
lastRet = NONE;       //最后一个索引为NONE
final ReentrantLock lock = ArrayBlockingQueue.this.lock;        //获取外部类的锁。
lock.lock();              //加锁
try {
if (count == 0) {     //当队列里面实际是没有数据的
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
itrs.register(this); // in this order
itrs.doSomeSweeping(false);  //清理无用的迭代器
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock();
}
}


上面构造方法是什么意思呢?

 
count
等于0的时候,就说明队列里面没有数据,那么创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。

  而
doSomeSweeping
主要用来清理无用的迭代器。在迭代器创建和detach的时候会触发。sweeper字段就是记录上次扫描到的位置。如果为null,就从链表头开始扫描,有就从其下一个开始扫描。如果找到了一个被回收了或者是耗尽的迭代器,就清理掉它,继续找下一个。这就完成了对无效迭代器的清理了。下面看看它的主要代码:

void doSomeSweeping(boolean tryHarder) {
int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;     //判断要尝试几次去清扫。
Node o, p;
final Node sweeper = this.sweeper;
boolean passedGo;   // to limit search to one full sweep

if (sweeper == null) {       //初始化o,p,以及passedGo
o = null;
p = head;
passedGo = true;
} else {
o = sweeper;
p = o.next;
passedGo = false;
}

for (; probes > 0; probes--) {          //循环次数。
if (p == null) {
if (passedGo)
break;
o = null;
p = head;
passedGo = true;
}
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.isDetached()) {     //这个iterator是null,或者已经处于detached模式了。
// found a discarded/exhausted iterator
probes = LONG_SWEEP_PROBES; // "try harder"
// unlink p
p.clear();
p.next = null;
if (o == null) {       //说明是第一个迭代器
head = next;
if (next == null) {        //itrs里面是空的了。
// We've run out of iterators to track; retire
itrs = null;
return;
}
}
else
o.next = next;     //o指向前一个清扫过的p
} else {
o = p;     //把p赋值给o,
}
p = next;     //p往后面串一个。
}

this.sweeper = (p == null) ? null : o;       //判断p,并给sweeper赋值。
}


下面主要看负责管理Iterator的Itrs类。

Itrs

先看结构:

class Itrs {

private class Node extends WeakReference<Itr> {
Node next;  //指向下一个节点
Node(Itr iterator, Node next) {
super(iterator);
this.next = next;
}
}

//
int cycles = 0;

//头节点head。
private Node head;

//用来去删除废弃的iterators。
private Node sweeper = null;
//尝试次数
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;


里面每个Iterator被一个Node节点封装,而每个Node又是一个弱引用(WeakReference),具体关于Java各种引用可看:Java中强引用、软引用、弱引用、虚引用

上文的add操作并没有调用itrs的相关操作。

在remove方法里面有调用,这里具体分析下:

而在
void removeAt(final int removeIndex)
,删除特定位置的元素方法里面调用了
itrs.elementDequeued()
; 接下来看
elementDequeued
方法:

void elementDequeued() {
if (count == 0)
queueIsEmpty();
else if (takeIndex == 0)
takeIndexWrapped();
}


当count为0时候,调用queueIsEmpty:

void queueIsEmpty() {
for (Node p = head; p != null; p = p.next) {
Itr it = p.get();
if (it != null) {
p.clear();
it.shutdown();
}
}
head = null;
itrs = null;
}


而在
queueIsEmpty
里面,则需要把itrs里面的所有node检查以便,如果此时里面的某一个iterator不为null,调用shutdown方法,shutdown方法里面则是把Iterator里面的状态标志初始化:

void shutdown() {
cursor = NONE;
if (nextIndex >= 0)
nextIndex = REMOVED;
if (lastRet >= 0) {
lastRet = REMOVED;
lastItem = null;
}
prevTakeIndex = DETACHED;
}


而在
elementDequeued
里面的第二个条件中,从外部类的
takeIndex
判断是否为0,从而判断是否能够拿东西(或者循环了一圈回到原点),如果不能拿,则调用
takeIndexWrapped
方法:

boolean takeIndexWrapped() {
// assert lock.getHoldCount() == 1;
if (isDetached())
return true;
if (itrs.cycles - prevCycles > 1) {   //
shutdown();
return true;
}
return false;
}


isDetached方法就是判断takeIndex的前一个元素是不是小于0,即takeIndex是不是为0。

所以对于remove方法里面,itrs做的主要事情如下:

队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除。

否者如果takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取

心得

总的来说,就增加修改逻辑来说,ArrayBlockingQueue并不难理解,主要逻辑就是ReentrantLock+Condition+数组,而里面比较相对于其他有特点的就是Iterator的实现,以及利用Itrs对Iterator进行管理的过程。

append

里面变量并没有用volatile来保证诸如count,putIndex,takeIndex的可见性。

另一方面,由于是加锁的阻塞队列,所以性能上是有缺陷的,但是功能上确实很好的,生产者消费者模型。

参考文章:

1. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ArrayBlockingQueue.html

2. http://www.cnblogs.com/lighten/p/7427763.html

3. http://www.infoq.com/cn/articles/java-blocking-queue/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: