您的位置:首页 > 产品设计 > UI/UE

Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueue

2016-09-05 01:34 951 查看

await – signal – signalAll

以下代码,分别展示了wait/notify, 和Condition的await/signal的用法

Object o = new Object();
synchronized(o)    //线程1
{
...
o.wait();   //内部,会先释放锁。被其他线程notify之后,会再次拿锁。
...
}

synchronized(o)   //线程2
{
...
o.notify();
...
}


ReentrantLock l = new ReentrantLock();
Condition c1 = l.newCondition();
Condition c2 = l.newCondition();

l.lock;               //线程1
try
{
...
c1.await();  //内部,会先释放锁。被其他线程signal之后,会再次拿锁。
...
c2.signal();
...
}finally
{
l.unlock();
}

l.lock;               //线程2
try
{
...
c2.await();  //内部,会先释放锁。被其他线程notify之后,会再次拿锁。
...
c1.signal();
...
}finally
{
l.unlock();
}


通过以上代码,明确3点:

(1)Condition必须与锁协同使用:对应synchronized来说,wait()的object必须是synchronized对应的同步对象;对应ReentrantLock来说,Condition是通过ReentrantLock.newCondition()得到的。

(2)wait()/await()的时候,会先释放锁,然后进入阻塞,然后被notify/signal唤醒之后,会再去拿锁!也就是其内部有3个环节:

//释放锁

//进入阻塞

//被唤醒,拿锁,执行后续代码

(3)await/signal在使用上,比wait/notify更加灵活:

wait/notify只能附属在一个条件上,所有的阻塞线程都在这1个条件上;

而Lock可以创建多个condition,每个condition都有wait/notify,每个condition都有一个自己的阻塞线程队列。

后面所讲的BlockingQueue,将很好的展示condition的这个优点。

Condition源码分析

从上面的第(2)条可以看出,await()的时候,线程要进入阻塞。所以每个Condition内部,都维护了一个链表,或者说队列,存储所有阻塞在这个条件上的线程。

以下代码,展示了Condition的内部结构:

//ConditionObject是AQS的一个内部类,实现了Condition接口

public class ConditionObject implements Condition, java.io.Serializable {
。。。
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;     //阻塞队列

。。。


下面看一下Condition.await()的源码:

//ReentrantLock
public Condition newCondition() {
return sync.newCondition();
}

//ReentrantLock的Sync内部类
final ConditionObject newCondition() {
return new ConditionObject();
}

//AQS的ConditionObject内部类
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();  //有人置了中段标志位,先响应中断
Node node = addConditionWaiter();  //把线程加入该condition的阻塞队列
int savedState = fullyRelease(node); //释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);   //开始阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  //被中断唤醒,跳出阻塞
break;
}
if (acquireQueued(node, savedState) &&  interruptMode != THROW_IE)  //被唤醒之后,重新拿锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}


由上述代码可以看出,await()是会响应中断的。下面看一下屏蔽中断的await(),即awaitUninterruptibly()

public final void awaitUninterruptibly() {
Node node = addConditionWaiter();  //加入阻塞队列
int savedState = fullyRelease(node); //释放锁
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);   //进入阻塞
if (Thread.interrupted())  //被中断唤醒,没有break,继续循环, 再次进入阻塞
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)  //阻塞出来,拿锁
selfInterrupt();  //此时,再响应中断
}


下面看一下signal()的源码:

public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);  //唤醒队列里面第1个
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);  //唤醒
return true;
}


关键点:无论await(), 还是signal(),都是在拿到锁之后执行的,所以其内部的入队/出队,都不需要加锁!

ArrayBlockingQueue

通常的Queue,一边是生产者,一边是消费者。一边进,一边出,有一个判空函数,一个判满函数。

而所谓的BlockingQueue,就是指当为空的时候,阻塞消费者线程;当为满的时候,阻塞生产者线程。

以下是ArrayBlockingQueue的核心结构:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

。。。
/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

//其核心就是1把锁 + 2个条件
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

。。。
}


以下为其主要的构造函数:

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}


以下为其put()/take()源代码

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();  //put的时候,队列满了,阻塞
insert(e);
} finally {
lock.unlock();
}
}

private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal(); //put进去之后,通知非空条件
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); //take的时候,队列为空,阻塞
return extract();
} finally {
lock.unlock();
}
}

private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();  //take完了,通知非满条件
return x;
}


顺便说一句:上述2个函数,都是响应中断,并且阻塞的。

另外还有不响应中断的,不阻塞的成员函数,在此就不再详述了。

LinkedBlockingQueue

LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为头和尾是2个指针分开操作的,所以用了2把锁 + 2个条件,同时一个AtomicInteger的原子变量记录count数。

private final AtomicInteger count = new AtomicInteger(0);

/** Head of linked list */
private transient Node<E> head;
/** Tail of linked list */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();


LinkedBlockingDeque

其原理和ArrayBlockingQueue是一样的,也是1把锁 + 2个条件。只是其数据结构不是数组,而是一个双向链表。

有一个小细节:链表不是无限长吗,怎么会满呢?这里是人为设置了一个最大长度:

public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);   //最大长度是整数的最大值
}


下面是其主要结构:

public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>,  java.io.Serializable {

...

//双向链表的Node
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;

Node(E x) {
item = x;
}
}

//队列的头,尾
transient Node<E> first;
transient Node<E> last;

private transient int count;
private final int capacity;

//1把锁 + 2个条件
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

。。。
}


下面是其put/take函数,其原理和ArrayBlockQueue的put/take类似:

public void put(E e) throws InterruptedException {
putLast(e);
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}

public E take() throws InterruptedException {
return takeFirst();
}
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}


PriorityBlockingQueue

和上面的BlockingQueue有2个区别:

(1)是无界的,所以只有notEmpty一个条件。put不会阻塞,只有take会阻塞

(2)通过2叉堆,实现Priority

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
...

private transient Object[] queue;  //2叉堆实现

//1把锁 + 1个条件
private final ReentrantLock lock;
private final Condition notEmpty;

...
}


SynchronousQueue

SynchronousQueue是一种特殊队列,内部不是用Lock + Condition实现的。后续会单独用一篇专门阐述。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息