LinkedBlockingQueue源码解析
2017-03-03 00:00
393 查看
摘要: 闲来无事 ^_^ 如有错误请指出,避免误人子弟
remove
get
LinkedBlockingQueue源码解析
什么是LinkedBlockingQueue
LinkedBlockingQueue底层是由节点链表实现的定长阻塞队列(阻塞表示如果没有原始那么获取元素会阻塞当前线程)LinkedBlockingQueue用来干嘛
LinkedBlockingQueue一般用于生产者消费者模型业务(排队机制,先进先出)源码解析
数据的存储
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; /** * Linked list node class */ static class Node<E> {//存储数据的节点 E item; Node<E> next; Node(E x) { item = x; } } /** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity;//链表的最大长度,如果不设置值默认为Integer.MAX_VALUE /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(0);//统计数量线程安全 /** * Head of linked list. * Invariant: head.item == null */ private transient Node<E> head;//头节点 /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last;//尾节点 /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();//tackLock /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition();//tackLock条件不为空 /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();//putLock /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();//putLock条件没满 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//默认last=head=空节点 }
数据的操作
addpublic void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();//不能存储空元素 int c = -1; Node<E> node = new Node(e);//创建节点 final ReentrantLock putLock = this.putLock;//获得putLock final AtomicInteger count = this.count;//获取当前数量 putLock.lockInterruptibly();//获取锁,如果有调用Thread.Interrupted()直接抛出异常 try { while (count.get() == capacity) {//如果当前队列以满,进入等待状态 notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public boolean offer(E e, long timeout, TimeUnit unit) offer(e)类似 throws InterruptedException { if (e == null) throw new NullPointerException();//不能存储空元素 long nanos = unit.toNanos(timeout);//装换为纳秒 int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos);//等待一段时间 } enqueue(new Node<E>(e)); c = count.getAndIncrement();//递增 if (c + 1 < capacity)//如果未满唤醒notFull.awit notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty();//唤醒notEmpty.await() return true; } private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; //拆分为两步 last.next = node,last = node //每次head.next=当前的last然后last.next指向node last = last.next = node; }
remove
public boolean remove(Object o) { if (o == null) return false; fullyLock();//删除数据时全部lock try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next;//前后元素执行,大年元素设置为空 if (last == p) last = trail; if (count.getAndDecrement() == capacity)//count获取数量同时递减(获取数量为递减钱数量) notFull.signal();//唤醒 notFull.await() }
get
//获取元素,消费,可能被中断 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//如果有调用Thread.Interrupted()抛出异常 try { while (count.get() == 0) { notEmpty.await();//元素为空进入等待状态 } x = dequeue();// c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //获取元素,消费 public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //查看元素 public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } } [null,aaa,bbb] queue [null,bbb] delete after queue 去掉头部null元素获取aaa元素修改aaa元素的item=null private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next;//first第一个有值的节点 h.next = h; // help GC head = first; E x = first.item;//获取元素 first.item = null;//设置为空 return x; }
什么时候扩容
定长链表不支持扩容是否线程安全
线程安全使用注意事项
默认创建方式链表醉大长度为Ineger.MAX_SIZE引用
jdk源码相关文章推荐
- LinkedBlockingQueue源码解析
- Java集合, LinkedBlockingQueue源码解析(常用于并发编程)
- Jdk1.6 JUC源码解析(13)-LinkedBlockingQueue
- LinkedBlockingQueue源码解析
- 源码解析:ArrayBlockingQueue和LinkedBlockingQueue的区别
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- Java 集合系列05之 LinkedList详细介绍(源码解析)和使用示例
- Java集合源码学习(17)_BlockingQueue接口的实现LinkedBlockingQueue
- java并发编程基础-ReentrantLock及LinkedBlockingQueue源码分析
- Java Concurrent包源码学习和使用心得 之 LinkedBlockingQueue源码解读
- LinkedList源码解析
- List接口实现类-ArrayList、Vector、LinkedList集合深入学习以及源码解析
- LinkedList源码解析及自定义LinkedList
- LinkedList源码解析
- Java源码解析-BlockingQueue
- LinkedList源码解析
- LinkedList源码解析
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- Java 集合系列 04 LinkedList详细介绍(源码解析)和使用示例
- ArrayList LinkedList源码解析