JDK并发工具类源码学习系列——LinkedBlockingQueue
2015-11-23 19:04
573 查看
LinkedBlockingQueue是一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
[b]使用场景[/b]
LinkedBlockingQueue常用于生产者/消费者模式中,作为生产者和消费者的通信桥梁。LinkedBlockingQueue与之前介绍的ConcurrentLinkedQueue以及PriorityBlockingQueue功能类似,都是Queue的一种,不同之处是:
LinkedBlockingQueue和PriorityBlockingQueue是阻塞的,而ConcurrentLinkedQueue是非阻塞的,
同时LinkedBlockingQueue和PriorityBlockingQueue通过加锁实现线程安全,而ConcurrentLinkedQueue使用CAS实现无锁模式
PriorityBlockingQueue支持优先级
由于不同的特征,所以以上三者的使用场景也不同:
LinkedBlockingQueue适合需要阻塞的队列场景,如果能不阻塞或者可以通过代码自行实现阻塞,那么建议使用ConcurrentLinkedQueue代替
ConcurrentLinkedQueue适合对性能要求较高,同时无需阻塞的场景使用
PriorityBlockingQueue适合需要根据任务的不同优先级进行调整队列的顺序的场景
[b]结构预览[/b]
LinkedBlockingQueue内部实现相对较简单,直接使用一个链表存储数据,通过加锁实现线程安全,通过两个Condition分别实现入队和出队的等待。链表的节点使用内部类:Node表示,Node很简单,就两个变量,由外部类直接修改即可。
item使用volatile修饰,解决内存可见性。
[b]常用方法解析[/b]
LinkedBlockingQueue常用方法有:入队(offer(E)/offer(E, long, TimeUnit)/put(E))、出队(poll()/poll(long, TimeUnit)/take())、删除(remove(Object))。下面分别看看这三类方法。
[b]入队[/b]
首先解析offer(),另外两个入队操作只是在队列已满的情况下进行一些特殊处理而已。文中代码给出了详细注释,这里着重说明两个地方:
对Condition的操作需要在加锁的环境下进行,而且是需要对与Condition相关的锁进行加锁,如此处notEmpty是由takeLock.newCondition()得来,所以对notEmpty的操作需要对takeLock进行加锁
入队操作也执行
下面直接贴出offer(E, long, TimeUnit)和put(E)的代码,基本同offer(E)。
[b]出队[/b]
出队操作和入队逻辑相同,看代码。
出队一个元素:
[b]删除[/b]
删除的逻辑也很简单,代码中给出了注释。
以上即本篇全部内容,比较简单,更多关于队列的研究可参考:
JDK并发工具类源码学习系列——ConcurrentLinkedQueue
JDK并发工具类源码学习系列——PriorityBlockingQueue
以上内容如有错误,请不吝赐教~
欢迎访问我的个人博客,寻找更多乐趣~
[b]使用场景[/b]
LinkedBlockingQueue常用于生产者/消费者模式中,作为生产者和消费者的通信桥梁。LinkedBlockingQueue与之前介绍的ConcurrentLinkedQueue以及PriorityBlockingQueue功能类似,都是Queue的一种,不同之处是:
LinkedBlockingQueue和PriorityBlockingQueue是阻塞的,而ConcurrentLinkedQueue是非阻塞的,
同时LinkedBlockingQueue和PriorityBlockingQueue通过加锁实现线程安全,而ConcurrentLinkedQueue使用CAS实现无锁模式
PriorityBlockingQueue支持优先级
由于不同的特征,所以以上三者的使用场景也不同:
LinkedBlockingQueue适合需要阻塞的队列场景,如果能不阻塞或者可以通过代码自行实现阻塞,那么建议使用ConcurrentLinkedQueue代替
ConcurrentLinkedQueue适合对性能要求较高,同时无需阻塞的场景使用
PriorityBlockingQueue适合需要根据任务的不同优先级进行调整队列的顺序的场景
[b]结构预览[/b]
LinkedBlockingQueue内部实现相对较简单,直接使用一个链表存储数据,通过加锁实现线程安全,通过两个Condition分别实现入队和出队的等待。链表的节点使用内部类:Node表示,Node很简单,就两个变量,由外部类直接修改即可。
/** * Linked list node class */ static class Node<E> { /** The item, volatile to ensure barrier separating write and read */ volatile E item; Node<E> next; Node(E x) { item = x; } }
item使用volatile修饰,解决内存可见性。
[b]常用方法解析[/b]
LinkedBlockingQueue常用方法有:入队(offer(E)/offer(E, long, TimeUnit)/put(E))、出队(poll()/poll(long, TimeUnit)/take())、删除(remove(Object))。下面分别看看这三类方法。
[b]入队[/b]
/** * @By Vicky:入队,无阻塞,队列未满则直接入队,否则直接返回false */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count;// 保存当前队列的长度 // 这里因为count是Atomic的,所以有类似volatile的内存可见性效果 // 即对count的修改能够立即被其他线程可见,所以此处不加锁的情况下读取count值是会读取到最新值的 // 然后根据此值进行前置判断,避免不必要的加锁操作 if (count.get() == capacity)// 队列已满直接返回false return false; int c = -1; final ReentrantLock putLock = this.putLock;// 获取putLock,加锁 putLock.lock(); try { if (count.get() < capacity) {// 队列未满则插入 insert(e); c = count.getAndIncrement();// 更新count值 if (c + 1 < capacity)// 未满则唤醒等待在notFull上的线程 // 此处有点怪异,入队唤醒notFull~ // 此处唤醒notFull是考虑有可能如果多个线程同时出队,由于出队唤醒notFull时也需要对putLock进行加锁 // 所以有可能一个线程出队,唤醒notFull,但是被另一个出队线程抢到了锁,所以入队线程依旧在等待 // 当另一个线程也唤醒了notFull,释放了putLock后,只能唤醒一个入队线程,所以其他线程依旧在等待 // 所以此处需要再次唤醒notFull notFull.signal(); } } finally { putLock.unlock(); } // c==0表示队列在插入之前是空的,所以需要唤醒等待在notEmpty上的线程 if (c == 0) signalNotEmpty(); return c >= 0; } /** * @By Vicky:唤醒notEmpty,需对takeLock进行加锁,因为notEmpty与takeLock相关 */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
首先解析offer(),另外两个入队操作只是在队列已满的情况下进行一些特殊处理而已。文中代码给出了详细注释,这里着重说明两个地方:
对Condition的操作需要在加锁的环境下进行,而且是需要对与Condition相关的锁进行加锁,如此处notEmpty是由takeLock.newCondition()得来,所以对notEmpty的操作需要对takeLock进行加锁
入队操作也执行
notFull.signal();的原因是避免入队线程未抢到锁而遗失了出队的唤醒操作。详细解析可以见文中的注释
下面直接贴出offer(E, long, TimeUnit)和put(E)的代码,基本同offer(E)。
/** * @By Vicky:入队,等待指定时间 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { for (;;) { // 此处同offer() if (count.get() < capacity) { insert(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); break; } // nanos是剩余的等待时间,<=0表示等待时间已到 if (nanos <= 0) return false; try { // 调用notFull的awaitNanos,指定等待时间,如果等待期间被唤醒,则返回剩余等待时间,<0表示等待时间已到 nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to a non-interrupted thread throw ie; } } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; } /** * @By Vicky:入队,无期限等待 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset // local var holding count negative to indicate failure unless set. int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { try { while (count.get() == capacity)// 无限等待,直到可用 notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to a non-interrupted thread throw ie; } insert(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
[b]出队[/b]
出队操作和入队逻辑相同,看代码。
/** * @By Vicky:出队,无阻塞,队列为空则直接返回null */ public E poll() { final AtomicInteger count = this.count; if (count.get() == 0)// 队列为空,直接返回 return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) {// 不为空,获取一个元素 x = extract(); c = count.getAndDecrement(); if (c > 1)// 同offer(),此处需唤醒notEmpty notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull();// 同offer(),此处需唤醒notFull return x; } /** * @By Vicky:出队,将head指向head.next * @return */ private E extract() { Node<E> first = head.next; head = first; E x = first.item; first.item = null; return x; } /** * @By Vicky:唤醒notFull,需对putLock进行加锁,因为notFull与putLock相关 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
出队一个元素:
extract(),逻辑很简单,将head指向head.next即可。其他地方与offer()的逻辑相同,如队列未空需唤醒notEmpty,队列由满变空需唤醒notFull,原因完全同offer()。poll(long, TimeUnit)和take()代码就不贴出来了,完全与offer()相同。
[b]删除[/b]
/** * @By Vicky:删除指定元素 */ public boolean remove(Object o) { if (o == null) return false; boolean removed = false; fullyLock();// 同时对takeLock和pullLock加锁,避免任何的入队和出队操作 try { Node<E> trail = head; Node<E> p = head.next; while (p != null) {// 从队列的head开始循环查找与o相同的元素 if (o.equals(p.item)) {// 找到相同的元素则设置remove为true removed = true; break; } trail = p;// 继续循环 p = p.next; } if (removed) { // remove==true,则表示查找到待删除元素,即p,将trail的next指向p的next,即将p从队列移除及完成删除 p.item = null; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signalAll(); } } finally { fullyUnlock(); } return removed; }
删除的逻辑也很简单,代码中给出了注释。
以上即本篇全部内容,比较简单,更多关于队列的研究可参考:
JDK并发工具类源码学习系列——ConcurrentLinkedQueue
JDK并发工具类源码学习系列——PriorityBlockingQueue
以上内容如有错误,请不吝赐教~
欢迎访问我的个人博客,寻找更多乐趣~
相关文章推荐
- 从源码安装Mysql/Percona 5.5
- Ubuntu 安装 JDK 问题
- 浅析Ruby的源代码布局及其编程风格
- C#线程队列用法实例分析
- 算法系列15天速成 第九天 队列
- C语言单链队列的表示与实现实例详解
- 探究在C++程序并发时保护共享数据的问题
- mysql 队列 实现并发读
- C#队列Queue用法实例分析
- C#多线程处理多个队列数据的方法
- asp.net 抓取网页源码三种实现方法
- C语言循环队列的表示与实现实例详解
- C++循环队列实现模型
- C#内置队列类Queue用法实例
- Array栈方法和队列方法的特点说明
- Nodejs实战心得之eventproxy模块控制并发
- JS小游戏之仙剑翻牌源码详解
- JS小游戏之宇宙战机源码详解
- jQuery源码分析之jQuery中的循环技巧详解
- 本人自用的global.js库源码分享