Inside AbstractQueuedSynchronizer (4)

3.6 ConditionObject




public final void await() throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

Node node = addConditionWaiter();

int savedState = fullyRelease(node);

int interruptMode = 0;

while (!isOnSyncQueue(node)) {


if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)



if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if (node.nextWaiter != null) // clean up if cancelled


if (interruptMode != 0)



以上代码中,可以看出ConditionObject的await语义跟Java语言内置的monitor机制是非常相似的(详见:http://whitesock.iteye.com/blog/162344 )。首先addConditionWaiter()将当前线程加入到ConditionQueue中,然后fullyRelease(node)释放掉跟ConditionObject关联的synchronizer锁。如果某个线程在没有持有对应的synchronizer锁的情况下调用某个ConditionObject对象的await()方法,那么跟Object.wait()一样会抛出IllegalMonitorStateException。接下来while (!isOnSyncQueue(node)) {...}会保证在其它线程调用了该ConditionObject的signal()/siangalAll()之前,当前线程一直被阻塞(signal()/siangalAll()的行为稍后会介绍)。在被signal()/siangalAll()唤醒之后,await()通过acquireQueued(node, savedState)确保再次获得synchronizer的锁。



public final void signal() {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

Node first = firstWaiter;

if (first != null)



private void doSignal(Node first) {

do {

if ( (firstWaiter = first.nextWaiter) == null)

lastWaiter = null;

first.nextWaiter = null;

} while (!transferForSignal(first) &&

(first = firstWaiter) != null);


那么跟await()一样,如果某个线程在没有持有对应的synchronizer锁的情况下调用某个ConditionObject对象的signal()/siangalAll()方法,会抛出IllegalMonitorStateException。signal()主要的行为就是将ConditionQueue中对应的Node实例transfer到AbstractQueuedSynchronizer的WaitQueue中,以便在synchronizer release的过程中,该Node对应的线程可能被唤醒。

3.7 Timeout & Cancellation

AbstractQueuedSynchronizer的acquireQueued()和doAcquire***()系列方法在acquire失败(超时或者中断)后,都会调用cancelAcquire(Node node)方法进行清理,其代码如下:


private void cancelAcquire(Node node) {

// Ignore if node doesn't exist

if (node == null)


node.thread = null;

// Skip cancelled predecessors

Node pred = node.prev;

while (pred.waitStatus > 0)

node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will

// fail if not, in which case, we lost race vs another cancel

// or signal, so no further action is necessary.

Node predNext = pred.next;

// Can use unconditional write instead of CAS here.

// After this atomic step, other Nodes can skip past us.

// Before, we are free of interference from other threads.

node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.

if (node == tail && compareAndSetTail(node, pred)) {

compareAndSetNext(pred, predNext, null);

} else {

// If successor needs signal, try to set pred's next-link

// so it will get one. Otherwise wake it up to propagate.

int ws;

if (pred != head &&

((ws = pred.waitStatus) == Node.SIGNAL ||

(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&

pred.thread != null) {

Node next = node.next;

if (next != null && next.waitStatus <= 0)

compareAndSetNext(pred, predNext, next);

} else {



node.next = node; // help GC



需要注意的是, cancelAcquire(Node node)方法是可能会被并发调用。while (pred.waitStatus > 0) {...}这段循环的作用就是清除当前Node之前的已经被标记为取消的节点,但是head节点除外(因为head节点保证不会被标记为Node.CANCELLED)。这段循环初看起来有并发问题,但是推敲一下之后发现:循环过程中函数参数node的waitStatus不会大于0,因此即使是多个线程并发执行这个循环,那么这些线程处理的都只是链表中互不重叠的一部分。接下来在node.waitStatus = Node.CANCELLED执行完毕之后,后续的操作都必须要避免并发问题。

关于处理线程中断, ConditionObject的await()/signal()/signalAll()等方法符合JSR 133: Java Memory Model and Thread Specification Revision中规定的语义:如果中断在signal之前发生,那么await必须在重新获得synchronizer的锁之后,抛出InterruptedException;如果中断发生在signal之后发生,那么await必须要设定当前线程的中断状态,并且不能抛出InterruptedException。

4 Reference

The java.util.concurrent Synchronizer Framework

The Art of Multiprocessor Programming
