3.Java数据结构原理解析-Queue系列
2017-11-28 09:21
781 查看
Queue,也就是队列,满足FIFO的特性。
在Java中,Queue是一个接口,它的实现类有很多,其中非线程安全的代表是LinkedList,线程安全的有阻塞和非阻塞的,阻塞的大都实现了Queue的子接口BlockingQueue(阻塞队列),例如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue等。非阻塞的有ConcurrentLinkedQueue。
Queue接口方法定义:
BlockingQueue接口定义(BlockingQueue除了继承Queue定义的方法外,还加入了自己的阻塞方法):
队列大多数是在多线程环境下使用的,生产者线程往队列中添加元素,消费者线程从队列中取出元素。所以,下面重点讨论ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue是采用什么样的数据结构和算法来保证队列的线程安全性的。
首先,来看看ArrayBlockingQueue的属性。
ArrayBlockingQueue的长度是固定的,无法扩容,所以创建一个ArrayBlockingQueue对象时,必须指定队列的容量,并且ArrayBlockingQueue不允许原始为null。从构造函数上可以看出这一点。
(1)插入元素add、offer、put
(2)取出元素remove、poll、take
LinkedBlockingQueueue由于使用了两个锁,所以允许同时添加和取出元素。这一点是和ArrayBlockingQueue最大的区别。
一个类的属性体现了这个类的数据结构,我们首先看看LinkedBlockingQueueue的属性
照常理来说,取出一个元素后,队列应该是notFull,那么拿锁控制的是应该是notFull的条件变量,但是因为此处存在两把锁,可能在取出元素后,又有元素加入了。所有此处拿锁控制的是notEmpty,取出元素后,只要判断剩下的元素是否大于1就可以了,因为不可能有两个线程同时执行取操作。
(1)插入元素add、offer、put
(2)取出操作remove、poll、take
可以参考:https://zhuanlan.zhihu.com/p/29227508
https://www.jianshu.com/p/376d368cb44f?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
下面介绍的ConcurrentLinkedQueue是非阻塞的,ConcurrentLinkedQueue底层的数据结构和LinkedBlockingQueue相同,也是使用单链表,不同的是ConcurrentLinkedQueue通过
Unsafe类提供了硬件级别的原子操作,主要compareAndSwapXXX方法实现。
关于Unsafe,网上有很多资源,请自行查阅。
我们首先来看看ConcurrentLinkedQueue的成员变量。
(1)入队add、offer
需要注意的是,每次入队之后,tail并不是总指向最后一个节点。奇数时是倒数第二个节点,偶数时是第一个节点。
可参考:http://blog.csdn.net/u013991521/article/details/53068549
在Java中,Queue是一个接口,它的实现类有很多,其中非线程安全的代表是LinkedList,线程安全的有阻塞和非阻塞的,阻塞的大都实现了Queue的子接口BlockingQueue(阻塞队列),例如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue等。非阻塞的有ConcurrentLinkedQueue。
Queue接口方法定义:
//添加元素,成功返回true,容量不够抛IllegalStateException boolean add(E e) //添加元素,成功返回true,容量不足返回false boolean offer(E e) //移除队首元素,队列为空时抛NoSuchElementException E remove() //移除队首元素,队列为空时返回null E poll() //查看队首元素,队列为空时抛NoSuchElementException E element() //查看队首元素,队列为空时返回null E peek()
BlockingQueue接口定义(BlockingQueue除了继承Queue定义的方法外,还加入了自己的阻塞方法):
//添加元素,容量不足阻塞 void put(E e) throws InterruptedException //添加元素,容量不足时等待指定时间 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException //移除队首元素,队列为空时阻塞 E take() throws InterruptedException //移除队首元素,队列为空时等待指定时间 E poll(long timeout, TimeUnit unit) throws InterruptedException
队列大多数是在多线程环境下使用的,生产者线程往队列中添加元素,消费者线程从队列中取出元素。所以,下面重点讨论ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue是采用什么样的数据结构和算法来保证队列的线程安全性的。
1.阻塞队列 ArrayBlockingQueue
ArrayBlockingQueue底层的数据结构是数组和循环队列,使用一个可重入锁和这个锁的两个条件对象进行并发控制。首先,来看看ArrayBlockingQueue的属性。
//存放元素的数组 final Object[] items; //循环队列头指针,起始值为0 int takeIndex; //循环队列尾指针,指向下一个元素插入的位置,起始值为0 int putIndex; //元素的个数 int count; //可重入锁(被final修饰,之所以没有初始化,是因为所有的构造方法里面都对lock进行了初始化) final ReentrantLock lock; //队列非空条件 private final Condition notEmpty; //队列未满条件 private final Condition notFull;
ArrayBlockingQueue的长度是固定的,无法扩容,所以创建一个ArrayBlockingQueue对象时,必须指定队列的容量,并且ArrayBlockingQueue不允许原始为null。从构造函数上可以看出这一点。
//创建一个指定容量的队列,锁默认是非公平的 public ArrayBlockingQueue(int capacity) { this(capacity, false); } //创建一个指定容量、指定锁的公平性的队列 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //创建锁 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //用现有的集合创建一个指定容量、指定锁的公平性的队列 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { //创建队列 this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion(锁定只用于可见性,而不是互斥) try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; //尾指针 putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
(1)插入元素add、offer、put
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; //队列尾指针+1 putIndex = inc(putIndex); ++count; //通知在notEmpty上等待的线程 notEmpty.signal(); } //循环加。循环队列的实现就体现在这里 final int inc(int i) { return (++i == items.length) ? 0 : i; } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //在锁上等待,直到获取锁,但是会响应中断,优先考虑响应中断,而不是响应锁的普通获取或重入获取。 //不明白为什么add和offer方法使用lock,而put方法使用lockInterruptibly? lock.lockInterruptibly(); try { //队列已满,在notFull对象上等待 while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
(2)取出元素remove、poll、take
public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //队列为空时返回null return (count == 0) ? null : extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; //队列头指针+1 takeIndex = inc(takeIndex); --count; //通知在notFull对象上等待的线程 notFull.signal(); return x; } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列为空时,在notEmpty上等待 while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } }
2.阻塞队列 LinkedBlockingQueueue
LinkedBlockingQueueue底层的数据结构是单向链表,使用两个可重入锁(放锁和拿锁)和对象的条件对象来进行并发控制。LinkedBlockingQueueue由于使用了两个锁,所以允许同时添加和取出元素。这一点是和ArrayBlockingQueue最大的区别。
一个类的属性体现了这个类的数据结构,我们首先看看LinkedBlockingQueueue的属性
//链表的节点。从节点可以看出该链表只有一个next指针,是单向的, static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } //队列的容量,定义为final,说明所有的构造方法必须初始化容量 private final int capacity; //元素的个数,因为使用了放锁和拿锁两个锁,所以同时添加和取出元素时存在并发问题,使用原子操作来保证元素的个数的准确性 private final AtomicInteger count = new AtomicInteger(0); //单向链表头指针,head.item永远为null。(定义为transient说明不能序列化) private transient Node<E> head; //单向链表尾指针,last.next永远为null。(定义为transient说明不能序列化) private transient Node<E> last; //拿锁(控制remove、poll、take方法等) private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); //放锁(控制add、offer、put方法等) private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
照常理来说,取出一个元素后,队列应该是notFull,那么拿锁控制的是应该是notFull的条件变量,但是因为此处存在两把锁,可能在取出元素后,又有元素加入了。所有此处拿锁控制的是notEmpty,取出元素后,只要判断剩下的元素是否大于1就可以了,因为不可能有两个线程同时执行取操作。
(1)插入元素add、offer、put
//add方法是在AbstractQueue实现了,所以跟ArrayBlockingQueue一样
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
//锁定放锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//队列未满
if (count.get() < capacity) {
enqueue(node);
//队列长度+1
c = count.getAndIncrement();
//插入之后,队列还是未满,通知在notFull对象上的等待的线程(例如:put方法)
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
//c==0表示插入之前队列为空,队列为空说明可能有读线程在阻塞,如果c>0,说明肯定没有读线程在阻塞
if (c == 0)
signalNotEmpty();
return c >= 0;
}
//signalNotEmpty虽然用在offder/put中,但是从不在putLock的同步区内。这样就保证同一时刻只持有一个锁,这样就不会出现死锁问题。
//???关于此处为什么加锁的问题,暂时就是这样理解的
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
//入队。入队操作很简单,就是将链表尾指针的next节点指向当前节点,并把当前节点设置为尾指针
private void enqueue(Node<E> node) {
last = last.next = node;
}
(2)取出操作remove、poll、take
//remove()方法是在AbstractQueue中实现了,跟ArrayBlockingQueue一样 public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { //出队 x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //出队操作比较简单,就是将单链表的头指针指向下一个元素 private E dequeue() { Node<E> h = head; Node<E> first = h.next; //不是很明白这个,如果要帮助GC,直接将h.next=null不是更好吗? h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { //在notEmpty条件上等待 notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
3.阻塞队列 SynchronousQueue
SynchronousQueue跟上面两个阻塞队列不同,它内部没有容器,一个生产线程put的时候,如果当前没有消费线程执行take,此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的数据(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。可以参考:https://zhuanlan.zhihu.com/p/29227508
https://www.jianshu.com/p/376d368cb44f?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
4.非阻塞队列 ConcurrentLinkedQueue
ArrayBlockingQueue和LinkedBlockingQueue都是阻塞的,阻塞体现在入队和出队的时候需要加锁。下面介绍的ConcurrentLinkedQueue是非阻塞的,ConcurrentLinkedQueue底层的数据结构和LinkedBlockingQueue相同,也是使用单链表,不同的是ConcurrentLinkedQueue通过
sun.misc.Unsafe类的CAS操作来保证线程安全的。
Unsafe类提供了硬件级别的原子操作,主要compareAndSwapXXX方法实现。
关于Unsafe,网上有很多资源,请自行查阅。
我们首先来看看ConcurrentLinkedQueue的成员变量。
//单链表头节点 private transient volatile Node<E> head; //单链表尾节点 private transient volatile Node<E> tail; //节点类型。与LinkedBlockingQueue不同的是,所有的赋值操作都是通过Unsafe对象的CAS来完成的,所以是线程安全的 private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } //为item赋值 boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } //为next指针赋值 boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { //获取item属性和next属性的内存地址 UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
(1)入队add、offer
需要注意的是,每次入队之后,tail并不是总指向最后一个节点。奇数时是倒数第二个节点,偶数时是第一个节点。
public boolean add(E e) { return offer(e); } public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { //获得p的下一个节点 Node<E> q = p.next; //如果下一个节点是null,也就是p节点就是尾节点 if (q == null) { //将单链表的尾节点的next指针指向新节点 if (p.casNext(null, newNode)) { if (p != t) //如果tail不是尾节点则将入队节点设置为tail。 // 如果失败了,那么说明有其他线程已经把tail移动过 casTail(t, newNode); return true; } // Lost CAS race to another thread; re-read next } // 如果p节点等于p的next节点,则说明p节点和q节点都为空,表示队列刚初始化,所以返回 else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } } //为队列的尾节点赋值 private boolean casTail(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); }
可参考:http://blog.csdn.net/u013991521/article/details/53068549
相关文章推荐
- 2.Java数据结构原理解析-List系列
- 1.Java数据结构原理解析-Map系列
- 4.Java数据结构原理解析-Set系列
- java基础解析系列(六)---注解原理及使用
- java基础解析系列(七)---ThreadLocal原理分析
- 深入Java集合学习系列:ConcurrentLinkedQueue及其实现原理
- java基础解析系列(四)---LinkedHashMap的原理及LRU算法的实现
- java基础解析系列(八)---fail-fast机制及CopyOnWriteArrayList的原理
- 深入Java集合学习系列:ArrayBlockingQueue及其实现原理
- java基础解析系列(四)---LinkedHashMap的原理及LRU算法的实现
- java基础解析系列(四)---LinkedHashMap的原理及LRU算法的实现
- 深入Java集合学习系列:Hashtable的实现原理
- 深入Java集合学习系列:ArrayList的实现原理
- 深入解析Java对象的hashCode和hashCode在HashMap的底层数据结构的应用
- Java 集合系列12之 TreeMap详细介绍(源码解析)和使用示例
- Java集合干货系列-(三)HashMap源码解析
- Java多线程系列--“JUC线程池”02之 线程池原理(一)
- 【数据结构】链表的原理及java实现
- JAVA8中的stream原理解析——1(串行)
- JAVA内存系列三之垃圾收集与内存分配原理