您的位置:首页 > 产品设计 > UI/UE

AbstractQueuedSynchronizer(七)——Share模式doAcquireShared

2016-07-14 10:42 567 查看
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放锁的操作和Exclusive有点不同,

{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared}自旋
获取资源的时候,{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate}

/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
*   Propagation was indicated by caller,
*     or was recorded (as h.waitStatus) by a previous operation
*     (note: this uses sign-check of waitStatus because
*      PROPAGATE status may transition to SIGNAL.)
* and
*   The next node is waiting in shared mode,
*     or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
会把当前节点置位head(这个和Exclusive一样),然后还多了一步操作,判断next节点如果是Shared,通过

{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared}中的
{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor}释放下一个节点。

/**
* Release action for shared mode -- signal successor and ensure
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases.  This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;            // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;                // loop on failed CAS
}
if (h == head)                   // loop if head changed
break;
}
}
在退出队列的条件上,和独占锁之间的主要区别在于获取共享状态成功之后的行为,而如果共享状态获取成功之后会判断后继节点是否是共享模式,

如果是共享模式,那么就直接对其进行唤醒操作,也就是同时激发多个线程并发的运行。

如果后继节点为null也会进行唤醒操作,为什么?要理解"The conservatism in both of these checks may cause unnecessary wake-ups, but only when there are multiple racing acquires/releases, so most need signals now or soon anyway."这句话。

LockSupport在unpark的时候,相当于给了一个信号,即使这时候没有现成在park状态,之后有线程执行park的时候也会读到这个信号,然后不会被挂起。

<p/>

1.举例1:{@link com.google.common.util.concurrent.AbstractFuture}<br/>

在数据准备好之前,所有的get操作都是阻塞的,然后执行了set操作之后,就会顺序(propagate语义)unparkSuccessor所有阻塞在get请求上的线程。

它的{@link com.google.common.util.concurrent.AbstractFuture.Sync#tryReleaseShared(int)}方法永远返回true,

所以这个方法要慎重使用,只有在set(complete)操作正常成功进行的时候,才会调用这个方法,然后释放所有阻塞在该资源上的锁。

<p/>

2.举例2:{@link java.util.concurrent.CountDownLatch}<br/>

它的{@link java.util.concurrent.CountDownLatch.Sync#tryReleaseShared(int)}只有当所有的锁都释放(state从

锁的个数递减到0之后)才会返回true,也才会释放所有阻塞在该资源上的锁,每次countDown都会调用该方法。

<p/>

3.总结<br/>

{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared(int)}的

{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#tryReleaseShared(int)}方法只要成功,

就会触发一系列的unpark操作,这一系列操作是通过

{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared(int)}中的

{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate}触发的,

所以只要下一个节点是Shared,就会向下执行

{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared()},

再{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor},

再{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#tryReleaseShared(int)}(肯定成功,

CountDownLatch的话已经count down到0了,SettableFuture已经执行过set方法了,两者都是执行了releaseShared),

继续{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate}直到结束或者

下一个节点是Exclusive为止。

<p/>

4.简而言之

解锁是{@link java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared(int)},

CountDownLatch调用多次,AbstractFuture调用一次。

propagate不停unpark下个Shared节点是"自旋"doAcquireShared中的setHeadAndPropagate调用doReleaseShared。

<p/>

propagate是繁殖扩散的意思,像鞭炮一样,一个接着接一个,直到把所有的被park的线程都激活。

         */
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: