Java并发容器之LinkedBlockingQueue
2017-06-29 13:12
381 查看
上一篇我们学习了ArrayBlockingQueue的实现原理,这一篇我们来学习与之对应的LinkedBlockingQueue。很明显,ArrayBlockingQueue内部是基于数组实现的,而LinkedBlockingQueue是基于链表。他们都实现了阻塞队列的take和put方法,下面我们会结合ArrayBlockingQueue作对比,来分析LinkedBlockingQueue的实现原理。
可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于
2 LinkedBlockingQueue类图结构
1)int capacity :确定了队列的容量,当指定容量后,不可超过。默认为Integer.Max_VALUE。
2)AtomicInteger count:队列元素数量,采用原子整数,这里与ArrayBlockingQueue不同(后者使用 int),因为LinkedBlockingQueue的写入。读出操作使用了两个不同的锁,是可以并行操作的,因此需要原子类来保证更新的原子性。
3)内部队列实现使用的是Node<E>节点,与LinkedList相似。
4)最重要一点,与ArrayBlockingQueue不同,那就是获取和插入分成了两个锁。
构造函数主要是1)确定队列大小,默认为Integer.Max_VALUE;2)设置头节点;3)设置尾节点。且头尾节点一开始指向null。
这里讨论下,为什么插入操作后也要通知写入线程:因为A、B线程同时写,A获得锁,B被阻塞,因此,A完成后需要立刻通知B线程写入,而不是等到读取线程给B通知。
peek方法从头节点直接就可以获取到第一个添加的元素,所以效率是比较高的。如果不存在则返回null。
由于内部实现的不同,具有两把锁的LinkedBlockingQueue拥有较大的吞吐量,但是在大多数情况下,性能要低于ArrayBlockingQueue。
1 LinkedBlockingQueue简介
LinkedBlockingQueue是一个基于链表的,范围任意的BlockingQueue。此队列按照FIFO规则排序。队列的头部是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于
Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
2 LinkedBlockingQueue类图结构
1)int capacity :确定了队列的容量,当指定容量后,不可超过。默认为Integer.Max_VALUE。2)AtomicInteger count:队列元素数量,采用原子整数,这里与ArrayBlockingQueue不同(后者使用 int),因为LinkedBlockingQueue的写入。读出操作使用了两个不同的锁,是可以并行操作的,因此需要原子类来保证更新的原子性。
3)内部队列实现使用的是Node<E>节点,与LinkedList相似。
4)最重要一点,与ArrayBlockingQueue不同,那就是获取和插入分成了两个锁。
3 构造方法
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
构造函数主要是1)确定队列大小,默认为Integer.Max_VALUE;2)设置头节点;3)设置尾节点。且头尾节点一开始指向null。
4 添加元素
offer方法
将指定元素插入队尾,成功返回true,方法返回。若队列已满,则插入失败,返回false。public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) //队列已满,则返回false return false; int c = -1; Node<E> node = new Node<E>(e); //构造节点 final ReentrantLock putLock = this.putLock; //取得写入锁 putLock.lock(); try { if (count.get() < capacity) { //需要在写入之前再次判断 enqueue(node); //入队 c = count.getAndIncrement(); //原子操作,防止读线程竞争冲突 if (c + 1 < capacity) notFull.signal(); //如果未满,通知写入线程 } } finally { putLock.unlock(); //解锁 } if (c == 0) //如果写入成功 signalNotEmpty(); //通知读取线程 return c >= 0; }
这里讨论下,为什么插入操作后也要通知写入线程:因为A、B线程同时写,A获得锁,B被阻塞,因此,A完成后需要立刻通知B线程写入,而不是等到读取线程给B通知。
put方法
将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。源码如下:public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); //如果中断,则抛出异常并退出。避免了取得锁后调用wait时才发现中断 try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { //每次唤醒则获取锁 检查容量 容量已满则继续阻塞 notFull.await(); } enqueue(node); //容量未满 插入操作 c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); //未满 通知插入线程 } finally { putLock.unlock(); //解锁 } if (c == 0) signalNotEmpty(); //通知读取线程 }
5 获取元素
peek方法
获取但不移除头元素,若为空返回null。源码如下:public E peek() { if (count.get() == 0) //元素为空返回null return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); //获取锁 try { Node<E> first = head.next; //头结点的next即为第一个结点 if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
peek方法从头节点直接就可以获取到第一个添加的元素,所以效率是比较高的。如果不存在则返回null。
poll方法
poll方法获取并移除此队列的头,如果此队列为空,则返回 null。源码如下:public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; //获取锁 takeLock.lock(); //加锁 try { if (count.get() > 0) { //如果有元素 则取出 x = dequeue(); c = count.getAndDecrement(); //更新count if (c > 1) notEmpty.signal(); //通知其他读取线程 } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); //通知写线程 return x; }
take方法
获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。源码如下:public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //获取锁 takeLock.lockInterruptibly(); //加锁 try { while (count.get() == 0) { //若已经空,则阻塞 notEmpty.await(); } x = dequeue(); //出队 c = count.getAndDecrement(); if (c > 1) //若队列中还有元素 则唤醒其他读取线程 notEmpty.signal(); } finally { takeLock.unlock(); //解锁 } if (c == capacity) //若不满 唤醒插入线程 signalNotFull(); return x; }
6 总结
ArrayBlockingQueue和LinkedBlockingQueue的不同
1)内部实现不同
ArrayBlockingQueue使用数组,而LinkedBlockingQueue使用链表。2)锁的实现不同
ArrayBlockingQueue入队出队都使用同一把锁,而LinkedBlockingQueue使用了两把锁。因此,ArrayBlockingQueue的入队出队操作是同步的,而LinkedBlockingQueue是可以并行的。这里的根本原因是:插入操作时,LinkedBlockingQueue的head不会影响head节点,而出队操作也不会影响tail节点。所以可以并行。3)初始化条件不同
ArrayBlockingQueue需要确定队列大小而LinkedBlockingQueue不需要,具有默认值Integer.Max_VALUE。4)性能
由于内部实现的不同,具有两把锁的LinkedBlockingQueue拥有较大的吞吐量,但是在大多数情况下,性能要低于ArrayBlockingQueue。
相关文章推荐
- Java 7之多线程并发容器 - LinkedBlockingQueue
- Java并发容器ConcurrentHashMap、ConcurrentLinkedQueue、BlockingQueue等
- 第六章 Java并发容器和框架(ConcurrentHashMap,ConcurrentLinkedQueue,BlockingQueue,Fork Join)
- Java并发集合——ArrayBlockingQueue ,LinkedBlockingQueue,ConcurrentHashMap
- Java并发容器之非阻塞队列ConcurrentLinkedQueue
- java 非阻塞算法在并发容器中的实现(ConcurrentLinkedQueue源码)
- Java并发之BlockingQueue 阻塞队列(ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、SynchronousQueue)
- 深入浅出 Java Concurrency (20): 并发容器 part 5 ConcurrentLinkedQueue
- Java同步并发容器队列(非阻塞队列与阻塞队列)BlockingQueue家族
- java并发容器(Map、List、BlockingQueue)
- java并发容器(Map、List、BlockingQueue)
- java并发容器(Map、List、BlockingQueue)
- Java并发容器:ConcurrentLinkedQueue
- 详细分析Java并发集合LinkedBlockingQueue的用法
- Java concurrent Framework并发容器之ConcurrentLinkedQueue(1.6)源码分析 ??
- JAVA并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors
- java并发容器(Map、List、BlockingQueue)
- java并发容器(Map、List、BlockingQueue)详解
- 深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue