Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueue
2016-09-05 01:34
951 查看
await – signal – signalAll
以下代码,分别展示了wait/notify, 和Condition的await/signal的用法Object o = new Object(); synchronized(o) //线程1 { ... o.wait(); //内部,会先释放锁。被其他线程notify之后,会再次拿锁。 ... } synchronized(o) //线程2 { ... o.notify(); ... }
ReentrantLock l = new ReentrantLock(); Condition c1 = l.newCondition(); Condition c2 = l.newCondition(); l.lock; //线程1 try { ... c1.await(); //内部,会先释放锁。被其他线程signal之后,会再次拿锁。 ... c2.signal(); ... }finally { l.unlock(); } l.lock; //线程2 try { ... c2.await(); //内部,会先释放锁。被其他线程notify之后,会再次拿锁。 ... c1.signal(); ... }finally { l.unlock(); }
通过以上代码,明确3点:
(1)Condition必须与锁协同使用:对应synchronized来说,wait()的object必须是synchronized对应的同步对象;对应ReentrantLock来说,Condition是通过ReentrantLock.newCondition()得到的。
(2)wait()/await()的时候,会先释放锁,然后进入阻塞,然后被notify/signal唤醒之后,会再去拿锁!也就是其内部有3个环节:
//释放锁
//进入阻塞
//被唤醒,拿锁,执行后续代码
(3)await/signal在使用上,比wait/notify更加灵活:
wait/notify只能附属在一个条件上,所有的阻塞线程都在这1个条件上;
而Lock可以创建多个condition,每个condition都有wait/notify,每个condition都有一个自己的阻塞线程队列。
后面所讲的BlockingQueue,将很好的展示condition的这个优点。
Condition源码分析
从上面的第(2)条可以看出,await()的时候,线程要进入阻塞。所以每个Condition内部,都维护了一个链表,或者说队列,存储所有阻塞在这个条件上的线程。以下代码,展示了Condition的内部结构:
//ConditionObject是AQS的一个内部类,实现了Condition接口 public class ConditionObject implements Condition, java.io.Serializable { 。。。 /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; //阻塞队列 。。。
下面看一下Condition.await()的源码:
//ReentrantLock public Condition newCondition() { return sync.newCondition(); } //ReentrantLock的Sync内部类 final ConditionObject newCondition() { return new ConditionObject(); } //AQS的ConditionObject内部类 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //有人置了中段标志位,先响应中断 Node node = addConditionWaiter(); //把线程加入该condition的阻塞队列 int savedState = fullyRelease(node); //释放锁 int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); //开始阻塞 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中断唤醒,跳出阻塞 break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //被唤醒之后,重新拿锁 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
由上述代码可以看出,await()是会响应中断的。下面看一下屏蔽中断的await(),即awaitUninterruptibly()
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); //加入阻塞队列 int savedState = fullyRelease(node); //释放锁 boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); //进入阻塞 if (Thread.interrupted()) //被中断唤醒,没有break,继续循环, 再次进入阻塞 interrupted = true; } if (acquireQueued(node, savedState) || interrupted) //阻塞出来,拿锁 selfInterrupt(); //此时,再响应中断 }
下面看一下signal()的源码:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); //唤醒队列里面第1个 } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); //唤醒 return true; }
关键点:无论await(), 还是signal(),都是在拿到锁之后执行的,所以其内部的入队/出队,都不需要加锁!
ArrayBlockingQueue
通常的Queue,一边是生产者,一边是消费者。一边进,一边出,有一个判空函数,一个判满函数。而所谓的BlockingQueue,就是指当为空的时候,阻塞消费者线程;当为满的时候,阻塞生产者线程。
以下是ArrayBlockingQueue的核心结构:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { 。。。 /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; //其核心就是1把锁 + 2个条件 final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; 。。。 }
以下为其主要的构造函数:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
以下为其put()/take()源代码
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //put的时候,队列满了,阻塞 insert(e); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); //put进去之后,通知非空条件 } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //take的时候,队列为空,阻塞 return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); //take完了,通知非满条件 return x; }
顺便说一句:上述2个函数,都是响应中断,并且阻塞的。
另外还有不响应中断的,不阻塞的成员函数,在此就不再详述了。
LinkedBlockingQueue
LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为头和尾是2个指针分开操作的,所以用了2把锁 + 2个条件,同时一个AtomicInteger的原子变量记录count数。private final AtomicInteger count = new AtomicInteger(0); /** Head of linked list */ private transient Node<E> head; /** Tail of linked list */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
LinkedBlockingDeque
其原理和ArrayBlockingQueue是一样的,也是1把锁 + 2个条件。只是其数据结构不是数组,而是一个双向链表。有一个小细节:链表不是无限长吗,怎么会满呢?这里是人为设置了一个最大长度:
public LinkedBlockingDeque() { this(Integer.MAX_VALUE); //最大长度是整数的最大值 }
下面是其主要结构:
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { ... //双向链表的Node static final class Node<E> { E item; Node<E> prev; Node<E> next; Node(E x) { item = x; } } //队列的头,尾 transient Node<E> first; transient Node<E> last; private transient int count; private final int capacity; //1把锁 + 2个条件 final ReentrantLock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); 。。。 }
下面是其put/take函数,其原理和ArrayBlockQueue的put/take类似:
public void put(E e) throws InterruptedException { putLast(e); } public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(node)) notFull.await(); } finally { lock.unlock(); } } private boolean linkLast(Node<E> node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; Node<E> l = last; node.prev = l; last = node; if (first == null) first = node; else l.next = node; ++count; notEmpty.signal(); return true; } public E take() throws InterruptedException { return takeFirst(); } public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Node<E> f = first; if (f == null) return null; Node<E> n = f.next; E item = f.item; f.item = null; f.next = f; // help GC first = n; if (n == null) last = null; else n.prev = null; --count; notFull.signal(); return item; }
PriorityBlockingQueue
和上面的BlockingQueue有2个区别:(1)是无界的,所以只有notEmpty一个条件。put不会阻塞,只有take会阻塞
(2)通过2叉堆,实现Priority
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ... private transient Object[] queue; //2叉堆实现 //1把锁 + 1个条件 private final ReentrantLock lock; private final Condition notEmpty; ... }
SynchronousQueue
SynchronousQueue是一种特殊队列,内部不是用Lock + Condition实现的。后续会单独用一篇专门阐述。相关文章推荐
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueu
- Java 容器源码分析之ArrayBlockingQueue和LinkedBlockingQueue
- java多线程系列(九)---ArrayBlockingQueue源码分析
- java多线程系列(九)---ArrayBlockingQueue源码分析
- Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
- Java多线程-BlockingQueue-ArrayBlockingQueue-LinkedBlockingQueue
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- Java LinkedBlockingQueue和ArrayBlockingQueue分析 .
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- 源码解析关于java阻塞容器:ArrayBlockingQueue,LinkedBlockingQueue等
- Java多线程系列--“JUC集合”07之 ArrayBlockingQueue
- JUC源码分析17-队列-ArrayBlockingQueue
- Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
- Java多线程系列--“JUC集合”08之 LinkedBlockingQueue
- Java多线程系列--“JUC集合”08之 LinkedBlockingQueue
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- Java多线程(六)之Deque与LinkedBlockingDeque深入分析
- Java多线程系列--“JUC集合”07之 ArrayBlockingQueue
- 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
- Java LinkedBlockingQueue和ArrayBlockingQueue分析