【JUC源码解析】LinkedBlockingQueue
2018-03-11 16:13
489 查看
简介
一个基于链表的阻塞队列,FIFO的顺序,head指向的元素等待时间最长,tail指向的元素等待时间最短,新元素从队列尾部添加,检索元素从队列头部开始,队列的容量,默认是Integer#MAX_VALUE。
源码分析
内部类Node
static class Node<E> { E item; // 结点的值 Node<E> next; // 指向下一个结点 Node(E x) { // 构造方法 item = x; } }
属性
private final int capacity; // 队列的容量,大小 private final AtomicInteger count = new AtomicInteger(); // 当前队列里元素的个数 transient Node<E> head; // 头结点,head.item = null private transient Node<E> last; // 尾结点,last.next = null private final ReentrantLock takeLock = new ReentrantLock(); // 可重入锁,take元素时,需持有该锁 private final Condition notEmpty = takeLock.newCondition(); // take锁上的条件,队列空时等待,不空时通知 private final ReentrantLock putLock = new ReentrantLock(); // 可重入锁,put元素时,需持有该锁 private final Condition notFull = putLock.newCondition(); // put锁上的条件,队列满时等待,不满时通知
通知方法
private void signalNotEmpty() { // 通知在take锁上等待的线程 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); // 加锁 try { notEmpty.signal(); // 通知 } finally { takeLock.unlock(); // 释放 } } private void signalNotFull() { // 通知在put锁上等待的线程 final ReentrantLock putLock = this.putLock; putLock.lock(); // 加锁 try { notFull.signal(); // 通知 } finally { putLock.unlock(); // 释放 } }
元素入队
private void enqueue(Node<E> node) { // 队尾入队 last = last.next = node; // last的next域指向新结点,last后移(指向新加入的结点) }
元素出队
private E dequeue() { // 队首出队 Node<E> h = head; // 获得头结点 Node<E> first = h.next; // 活动第一个有效(item != null)结点(head结点的next结点) h.next = h; // next域指向自己,帮助GC head = first; // head后移 E x = first.item; // 取得结点值 first.item = null; // 置空 return x; // 返回 }
加锁与释放
void fullyLock() { // 加锁 putLock.lock(); takeLock.lock(); } void fullyUnlock() { // 释放 takeLock.unlock(); putLock.unlock(); }
构造方法
public LinkedBlockingQueue() { // 构造方法 this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { // 构造方法 if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); // 初始时,last和head指向一个DUMMY结点 } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // 加锁,可见性 try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); // 空指针 if (n == capacity) throw new IllegalStateException("Queue full"); // 越界 enqueue(new Node<E>(e)); // 元素入队 ++n; // 递增 } count.set(n); // 设置当前队列里元素的个数 } finally { putLock.unlock(); // 解锁 } }
添加元素
put(E e)
public void put(E e) throws InterruptedException { // 添加元素 if (e == null) throw new NullPointerException(); // 空指针 int c = -1; Node<E> node = new Node<E>(e); // 创建新结点 final ReentrantLock putLock = this.putLock; // 获得put锁 final AtomicInteger count = this.count; // 获得当前元素的个数 putLock.lockInterruptibly(); // 加锁,响应中断 try { while (count.get() == capacity) { // 队列满了 notFull.await(); // 要等一等 } enqueue(node); // 入队 c = count.getAndIncrement(); // 获取队列的容量 if (c + 1 < capacity) // 不满,唤醒等待的线程 notFull.signal(); // 通知 } finally { putLock.unlock(); // 解锁 } if (c == 0) // 队列非空(c初始值为-1) signalNotEmpty(); }
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); // 空指针 long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; // 获得put锁 final AtomicInteger count = this.count; // 获得当前元素的个数 putLock.lockInterruptibly(); // 加锁,响应中断 try { while (count.get() == capacity) { // 队列满了 if (nanos <= 0) // 超时,返回 return false; nanos = notFull.awaitNanos(nanos); // 等待响应的时间 } enqueue(new Node<E>(e)); // 入队 c = count.getAndIncrement(); // 获取队列的容量 if (c + 1 < capacity) // 不满,唤醒等待的线程 notFull.signal(); // 通知 } finally { putLock.unlock(); // 解锁 } if (c == 0) signalNotEmpty(); // 队列非空(c初始值为-1) return true; }
offer(E e)
public boolean offer(E e) { if (e == null) throw new NullPointerException(); // 空指针 final AtomicInteger count = this.count; // 获得当前元素的个数 if (count.get() == capacity) // 队列满了,直接返回失败 return false; int c = -1; Node<E> node = new Node<E>(e); // 新建结点 final ReentrantLock putLock = this.putLock; // 获得put锁 putLock.lock(); // 加锁 try { if (count.get() < capacity) { // 不满 enqueue(node); // 入队 c = count.getAndIncrement(); // 加1 if (c + 1 < capacity) // 不满,通知 notFull.signal(); } } finally { putLock.unlock(); // 解锁 } if (c == 0) signalNotEmpty(); // 不空,通知 return c >= 0; }
获取元素
take()
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; // 当前队列元素个数 final ReentrantLock takeLock = this.takeLock; // 获取take锁 takeLock.lockInterruptibly(); // 加锁,响应中断 try { while (count.get() == 0) { // 队列空了 notEmpty.await(); // 等待 } x = dequeue(); // 出队 c = count.getAndDecrement(); // 减1 if (c > 1) // 不空 notEmpty.signal(); // 通知 } finally { takeLock.unlock(); // 解锁 } if (c == capacity) // 获取元素之前,队列是满的,有线程在put元素时阻塞,当前线程take一个元素后,空出一个位置 signalNotFull(); // 通知 return x; }
poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); // 计算等待时间 final AtomicInteger count = this.count; // 当前队列元素个数 final ReentrantLock takeLock = this.takeLock; // 获得take锁 takeLock.lockInterruptibly(); // 加锁,响应中断 try { while (count.get() == 0) { // 队列空了 if (nanos <= 0) // 超时 return null; nanos = notEmpty.awaitNanos(nanos); // 等待指定时间 } x = dequeue(); // 出队 c = count.getAndDecrement(); // 个数减1 if (c > 1) // 非空 notEmpty.signal(); // 通知 } finally { takeLock.unlock(); } if (c == capacity) // 同take()方法 signalNotFull(); return x; }
poll()
public E poll() { final AtomicInteger count = this.count; // 当前队列元素个数 if (count.get() == 0) // 队列空了,直接返回 return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; // 获得take锁 takeLock.lock(); // 加锁 try { if (count.get() > 0) { // 非空 x = dequeue(); // 元素出队 c = count.getAndDecrement(); // 个数减1 if (c > 1) // 非空,通知 notEmpty.signal(); } } finally { takeLock.unlock(); // 解锁 } if (c == capacity) signalNotFull(); // 同take()方法 return x; }
peek()
public E peek() { // 只获取元素,不出队 if (count.get() == 0) // 队列为空,直接返回null return null; final ReentrantLock takeLock = this.takeLock; // 获得take锁 takeLock.lock(); // 解锁 try { Node<E> first = head.next; // 取得第一个有效元素 if (first == null) // 为空,直接返回null return null; else return first.item; // 返回结果 } finally { takeLock.unlock(); // 解锁 } }
剔除结点p
void unlink(Node<E> p, Node<E> trail) { // 剔除结点p p.item = null; // 置空 trail.next = p.next; // 断开p, 连接p的next结点 if (last == p) // 如果p是尾结点,last指针前移 last = trail; if (count.getAndDecrement() == capacity) // 同take()方法 notFull.signal(); }
删除元素
public boolean remove(Object o) { // 删除元素o if (o == null) return false; fullyLock(); // 加锁 try { // 从头结点开始遍历,找寻o元素所在的结点,并从中剔除它 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); // 剔除 return true; } } return false; } finally { fullyUnlock(); // 解锁 } }
迁徙
public int drainTo(Collection<? super E> c, int maxElements) { // 将当前队列里的元素移动到c中,并从当前队列里清除这些元素 if (c == null) throw new NullPointerException(); // 空指针 if (c == this) throw new IllegalArgumentException(); // 不合法参数 if (maxElements <= 0) // 参数校验 return 0; boolean signalNotFull = false; final ReentrantLock takeLock = this.takeLock; // 获得take锁 takeLock.lock(); // 加锁 try { int n = Math.min(maxElements, count.get()); // 取其中较小值 Node<E> h = head; // 头结点 int i = 0; // 初始值 try { while (i < n) { Node<E> p = h.next; // 取得元素 c.add(p.item); // 添加到集合c中 p.item = null; // 置空 h.next = h; // 结点next域指向自己,帮助GC h = p; // 元素出队 ++i; // 自增 } return n; // 返回 } finally { if (i > 0) { head = h; // 更新头节点 signalNotFull = (count.getAndAdd(-i) == capacity); // 需要通知 } } } finally { takeLock.unlock(); // 解锁 if (signalNotFull) signalNotFull(); // 通知 } }
行文至此结束。
尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_lbq.html
相关文章推荐
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueu
- Jdk1.6 JUC源码解析(12)-ArrayBlockingQueue
- 源码解析关于java阻塞容器:ArrayBlockingQueue,LinkedBlockingQueue等
- 【JUC源码解析】ConcurrentLinkedQueue
- 【JUC源码解析】PriorityBlockingQueue
- 源码解析:ArrayBlockingQueue和LinkedBlockingQueue的区别
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueue
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- Java多线程系列--“JUC集合”08之 LinkedBlockingQueue
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- Java集合源码学习(17)_BlockingQueue接口的实现LinkedBlockingQueue
- 05.JUC 集合 - LinkedBlockingQueue
- Java多线程系列--“JUC集合”08之 LinkedBlockingQueue
- ArrayBlockingQueue源码解析
- JUC源码分析21-队列-LinkedBlockingDeque
- 阻塞队列和ArrayBlockingQueue源码解析
- 阻塞队列LinkedBlockingQueue源码分析
- ArrayBlockingQueue源码解析
- 第八章 ArrayBlockingQueue源码解析
- 关于java ArrayBlockingQueue 源码解析的小疑惑