Java并发学习(二十二)-ArrayBlockingQueue分析
2018-01-07 10:06
609 查看
这两天花了几个小时来看ArrayBlockingQueue,阻塞队列。其实它的实现思想是比较简单的,主要是利用ReentrantLock和Condition来实现。首先理解什么是阻塞队列:
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
在队列为空时,获取元素的线程会等待队列变为非空。
当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
也就是说,阻塞队列,这里以ArrayBlockQueue来说明,用生产者消费者模型来理解再好不过了。当队列里有东西,你可以进行取操作,没有东西时候,你就需要阻塞起来,等有东西再取。
接下来看里面定义:
上述代码中,大体都有注释,我觉得比较值得思考的有普通的两个index,两个Condition,当然还有一个
itrs这个变量,感觉是个比较新的思想,下文会聊。
最终父类的add会调用子类具体实现的offer方法:
在offer方法里面,会用lock进行加锁,也就是一次只能一个线程对queue进行操作,最终,会调用enqueue方法:
插入的就是简单的利用putIndex,在数组里面新增加一个元素,然后调用
跟所有在等待取数的线程说,喂,有东西了,来拿吧!
从上文add相关代码来看,还是比较好理解的,存储结构是数组,逻辑结构是队列,利用ReentrantLock进行加锁。
使用Condition实现等待/通知模式
实现逻辑基本一致,注意一旦队列满了,需要调用
同样如果count==0,则需要等待。接下来看dequeue方法:
同样是针对数组的简单出队,从队尾出。会调用一次
remainingCapacity() : 返回数组的甚于容量
remove(Object o) :删除某一个特定值为o的元素
size() :返回该元素的数量
spliterator() :返回此元素的分割迭代器
…
基本都是针对数组的操作,利用ReentrantLock和Condition进行控制。
下面主要看它的迭代器。
所以,所有的迭代器共享数据,队列改变会影响所有的迭代器。为了保证正确,增加了许多复杂的操作,但是由于循环数组和一些内部移除会导致迭代器丢失它们的位置,或显示一些它们不应该显示的元素。
比如,迭代器在创建的时候,其位置已经确定,但是队列可能在不断的出入队列,这样迭代器会受到严重影响,可能造成队列实际上入出循环了数组一圈,而迭代器记录的是上一圈的情况,只有下标,这样遍历就会造成很大的问题。
为了避免这个情况,同时也为了保证操作的正确性,当队列有一个或多个迭代器的时候,其通过以下手段保持状态:
跟踪循环的次数。即 takeIndex为0的次数。
每当删除一个内部元素时,通过回调通知所有迭代器(因此其他元素也可以移动)。
下面就是它的主要字段:
cursor:主要指向下一个元素
nextItem:指向下一个元素
nextIndex:nextItem的index
lastItem:最后一个元素
lastRet:最后一个元素的索引
prevTakeIndex:takeIndex的前一个位置
prevCycles:itrs监控前一个的循环数量cycles的值
NONE = -1:none模式,代表节点不存在或者没有
REMOVED = -2:说明当前节点被其他线程调用remove模式删除了
DETACHED = -3:说明处于detached模式
接下来看它的构造方法:
上面构造方法是什么意思呢?
而
下面主要看负责管理Iterator的Itrs类。
里面每个Iterator被一个Node节点封装,而每个Node又是一个弱引用(WeakReference),具体关于Java各种引用可看:Java中强引用、软引用、弱引用、虚引用
上文的add操作并没有调用itrs的相关操作。
在remove方法里面有调用,这里具体分析下:
而在
当count为0时候,调用queueIsEmpty:
而在
而在
isDetached方法就是判断takeIndex的前一个元素是不是小于0,即takeIndex是不是为0。
所以对于remove方法里面,itrs做的主要事情如下:
队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除。
否者如果takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取
另一方面,由于是加锁的阻塞队列,所以性能上是有缺陷的,但是功能上确实很好的,生产者消费者模型。
参考文章:
1. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ArrayBlockingQueue.html
2. http://www.cnblogs.com/lighten/p/7427763.html
3. http://www.infoq.com/cn/articles/java-blocking-queue/
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
在队列为空时,获取元素的线程会等待队列变为非空。
当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
也就是说,阻塞队列,这里以ArrayBlockQueue来说明,用生产者消费者模型来理解再好不过了。当队列里有东西,你可以进行取操作,没有东西时候,你就需要阻塞起来,等有东西再取。
What is ArrayBlockingQueue
首先从名字里面可以看出来,ArrayBlockingQueue里面主要的数据结构就是一个数组,使用ReentrantLock对其进行加锁,使用Condition实现存取等待。接下来看里面定义:
/** * FIFO特性。 * head元素是队列里面存在最久的元素。 * 非空 * 一旦创建,大小就不能改变。 * 如果队列满了,再想入対就会阻塞 * 支持公平锁策略。默认是非公平的。 */ public class ArrayBlockingQueueAnlysis<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; //用来存items int takeIndex; //出队,获取的下一个index。 int putIndex; //add或者put的下一个index int count; //元素的个数 final ReentrantLock lock; //主要的一把锁 private final Condition notEmpty; //空的condition private final Condition notFull; //满的condition transient Itrs itrs = null; //用于以链表方式存储所有已经创建的iterator。 ... }
上述代码中,大体都有注释,我觉得比较值得思考的有普通的两个index,两个Condition,当然还有一个
itrs这个变量,感觉是个比较新的思想,下文会聊。
add操作
由add操作:public boolean add(E e) { return super.add(e); }
最终父类的add会调用子类具体实现的offer方法:
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); //加锁。 try { if (count == items.length) return false; else { enqueue(e); //入队。 return true; } } finally { lock.unlock(); } }
在offer方法里面,会用lock进行加锁,也就是一次只能一个线程对queue进行操作,最终,会调用enqueue方法:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
插入的就是简单的利用putIndex,在数组里面新增加一个元素,然后调用
notEmpty.signal();
跟所有在等待取数的线程说,喂,有东西了,来拿吧!
从上文add相关代码来看,还是比较好理解的,存储结构是数组,逻辑结构是队列,利用ReentrantLock进行加锁。
使用Condition实现等待/通知模式
put操作
对于add,还有一个类似的put操作:public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //如果满了,调用notFull这个condition加入队列睡眠等待。 enqueue(e); } finally { lock.unlock(); } }
实现逻辑基本一致,注意一旦队列满了,需要调用
notFull.await()等待。
take操作
现在看看出队的take操作:public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
同样如果count==0,则需要等待。接下来看dequeue方法:
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; //当前位置置null,有利于回收。 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //把itrs里面也出队一个。 notFull.signal(); return x; }
同样是针对数组的简单出队,从队尾出。会调用一次
itrs.elementDequeued();下文提到。
其他方法
里面还有一些其他方法例如:remainingCapacity() : 返回数组的甚于容量
remove(Object o) :删除某一个特定值为o的元素
size() :返回该元素的数量
spliterator() :返回此元素的分割迭代器
…
基本都是针对数组的操作,利用ReentrantLock和Condition进行控制。
下面主要看它的迭代器。
Itr
感觉ArrayBlockingQueue里面的迭代器非常有特色,不同于普通的集合类里面那么简单,从阅读类的源码来看,它是线程安全的,不会抛出ConcurrentModificationException。另一方面,ArrayBlockingQueue里面数组下标是循环利用的,可以理解为是条循环队列。
所以,所有的迭代器共享数据,队列改变会影响所有的迭代器。为了保证正确,增加了许多复杂的操作,但是由于循环数组和一些内部移除会导致迭代器丢失它们的位置,或显示一些它们不应该显示的元素。
比如,迭代器在创建的时候,其位置已经确定,但是队列可能在不断的出入队列,这样迭代器会受到严重影响,可能造成队列实际上入出循环了数组一圈,而迭代器记录的是上一圈的情况,只有下标,这样遍历就会造成很大的问题。
为了避免这个情况,同时也为了保证操作的正确性,当队列有一个或多个迭代器的时候,其通过以下手段保持状态:
跟踪循环的次数。即 takeIndex为0的次数。
每当删除一个内部元素时,通过回调通知所有迭代器(因此其他元素也可以移动)。
下面就是它的主要字段:
cursor:主要指向下一个元素
nextItem:指向下一个元素
nextIndex:nextItem的index
lastItem:最后一个元素
lastRet:最后一个元素的索引
prevTakeIndex:takeIndex的前一个位置
prevCycles:itrs监控前一个的循环数量cycles的值
NONE = -1:none模式,代表节点不存在或者没有
REMOVED = -2:说明当前节点被其他线程调用remove模式删除了
DETACHED = -3:说明处于detached模式
接下来看它的构造方法:
Itr() { lastRet = NONE; //最后一个索引为NONE final ReentrantLock lock = ArrayBlockingQueue.this.lock; //获取外部类的锁。 lock.lock(); //加锁 try { if (count == 0) { //当队列里面实际是没有数据的 cursor = NONE; nextIndex = NONE; prevTakeIndex = DETACHED; } else { final int takeIndex = ArrayBlockingQueue.this.takeIndex; prevTakeIndex = takeIndex; nextItem = itemAt(nextIndex = takeIndex); cursor = incCursor(takeIndex); if (itrs == null) { itrs = new Itrs(this); } else { itrs.register(this); // in this order itrs.doSomeSweeping(false); //清理无用的迭代器 } prevCycles = itrs.cycles; } } finally { lock.unlock(); } }
上面构造方法是什么意思呢?
count等于0的时候,就说明队列里面没有数据,那么创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。
而
doSomeSweeping主要用来清理无用的迭代器。在迭代器创建和detach的时候会触发。sweeper字段就是记录上次扫描到的位置。如果为null,就从链表头开始扫描,有就从其下一个开始扫描。如果找到了一个被回收了或者是耗尽的迭代器,就清理掉它,继续找下一个。这就完成了对无效迭代器的清理了。下面看看它的主要代码:
void doSomeSweeping(boolean tryHarder) { int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; //判断要尝试几次去清扫。 Node o, p; final Node sweeper = this.sweeper; boolean passedGo; // to limit search to one full sweep if (sweeper == null) { //初始化o,p,以及passedGo o = null; p = head; passedGo = true; } else { o = sweeper; p = o.next; passedGo = false; } for (; probes > 0; probes--) { //循环次数。 if (p == null) { if (passedGo) break; o = null; p = head; passedGo = true; } final Itr it = p.get(); final Node next = p.next; if (it == null || it.isDetached()) { //这个iterator是null,或者已经处于detached模式了。 // found a discarded/exhausted iterator probes = LONG_SWEEP_PROBES; // "try harder" // unlink p p.clear(); p.next = null; if (o == null) { //说明是第一个迭代器 head = next; if (next == null) { //itrs里面是空的了。 // We've run out of iterators to track; retire itrs = null; return; } } else o.next = next; //o指向前一个清扫过的p } else { o = p; //把p赋值给o, } p = next; //p往后面串一个。 } this.sweeper = (p == null) ? null : o; //判断p,并给sweeper赋值。 }
下面主要看负责管理Iterator的Itrs类。
Itrs
先看结构:class Itrs { private class Node extends WeakReference<Itr> { Node next; //指向下一个节点 Node(Itr iterator, Node next) { super(iterator); this.next = next; } } // int cycles = 0; //头节点head。 private Node head; //用来去删除废弃的iterators。 private Node sweeper = null; //尝试次数 private static final int SHORT_SWEEP_PROBES = 4; private static final int LONG_SWEEP_PROBES = 16;
里面每个Iterator被一个Node节点封装,而每个Node又是一个弱引用(WeakReference),具体关于Java各种引用可看:Java中强引用、软引用、弱引用、虚引用
上文的add操作并没有调用itrs的相关操作。
在remove方法里面有调用,这里具体分析下:
而在
void removeAt(final int removeIndex),删除特定位置的元素方法里面调用了
itrs.elementDequeued(); 接下来看
elementDequeued方法:
void elementDequeued() { if (count == 0) queueIsEmpty(); else if (takeIndex == 0) takeIndexWrapped(); }
当count为0时候,调用queueIsEmpty:
void queueIsEmpty() { for (Node p = head; p != null; p = p.next) { Itr it = p.get(); if (it != null) { p.clear(); it.shutdown(); } } head = null; itrs = null; }
而在
queueIsEmpty里面,则需要把itrs里面的所有node检查以便,如果此时里面的某一个iterator不为null,调用shutdown方法,shutdown方法里面则是把Iterator里面的状态标志初始化:
void shutdown() { cursor = NONE; if (nextIndex >= 0) nextIndex = REMOVED; if (lastRet >= 0) { lastRet = REMOVED; lastItem = null; } prevTakeIndex = DETACHED; }
而在
elementDequeued里面的第二个条件中,从外部类的
takeIndex判断是否为0,从而判断是否能够拿东西(或者循环了一圈回到原点),如果不能拿,则调用
takeIndexWrapped方法:
boolean takeIndexWrapped() { // assert lock.getHoldCount() == 1; if (isDetached()) return true; if (itrs.cycles - prevCycles > 1) { // shutdown(); return true; } return false; }
isDetached方法就是判断takeIndex的前一个元素是不是小于0,即takeIndex是不是为0。
所以对于remove方法里面,itrs做的主要事情如下:
队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除。
否者如果takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取
心得
总的来说,就增加修改逻辑来说,ArrayBlockingQueue并不难理解,主要逻辑就是ReentrantLock+Condition+数组,而里面比较相对于其他有特点的就是Iterator的实现,以及利用Itrs对Iterator进行管理的过程。append
里面变量并没有用volatile来保证诸如count,putIndex,takeIndex的可见性。另一方面,由于是加锁的阻塞队列,所以性能上是有缺陷的,但是功能上确实很好的,生产者消费者模型。
参考文章:
1. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ArrayBlockingQueue.html
2. http://www.cnblogs.com/lighten/p/7427763.html
3. http://www.infoq.com/cn/articles/java-blocking-queue/
相关文章推荐
- Java 并发 --- 阻塞队列之ArrayBlockingQueue源码分析
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- 详细分析Java并发集合ArrayBlockingQueue的用法
- 【死磕Java并发】-----分析 ArrayBlockingQueue 构造函数加锁问题
- Java并发学习笔记(七)-ArrayBlockingQueue
- Java concurrent Framework并发容器之ArrayBlockingQueue(1.6)源码分析
- Java并发学习(二十四)-PriorityBlockingQueue分析
- java 队列阻塞方法ArrayBlockingQueue学习
- Java 7之多线程并发容器 - ArrayBlockingQueue
- Java并发----ArrayBlockingQueue
- Java 集合框架分析:ArrayBlockingQueue java1.8
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueu
- Java并发容器之ArrayBlockingQueue
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍
- Java 容器源码分析之ArrayBlockingQueue和LinkedBlockingQueue
- Java LinkedBlockingQueue和ArrayBlockingQueue分析
- Thread学习(九) 并发的Queen学习ArrayBlockingQueue,LinkedBlockingQueue
- 黑马程序员——高新技术—java5并发库之ArrayBlockingQueue
- Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析