您的位置:首页 > 其它

AQS源码分析(共享与互斥)

2016-03-04 09:40 369 查看

共享模式与独占模式

AQL的内部队列采用的是CLH队列锁模型,CLH队列是由一个一个结点(Node)构成的。Node类中有两个常量SHARE和EXCLUSIVE,顾名思义这两个常量用于表示这个结点支持共享模式还是独占模式,共享模式指的是允许多个线程获取同一个锁而且可能获取成功,独占模式指的是一个锁如果被一个线程持有,其他线程必须等待。多个线程读取一个文件可以采用共享模式,而当有一个线程在写文件时不会允许另一个线程写这个文件,这就是独占模式的应用场景。

[java] view plain copy

 print?

/** Marker to indicate a node is waiting in shared mode */  

static final Node SHARED = new Node();  

  

/** Marker to indicate a node is waiting in exclusive mode */  

static final Node EXCLUSIVE = null;  

  

final boolean isShared() {  

    return nextWaiter == SHARED;  

}  

以上代码是两种模式的定义,可以通过方法isShared来判断一个结点处于何种模式。

共享模式下获取锁

共享模式下获取锁是通过tryAcquireShared方法来实现的,其流程大至如下:



AQS类方法中方法名不含shared的默认是独占模式,前面提到子类需要重写tryAcquire方法,这是在独占模式下。如果子类想支持共享模式,同样必须重写tryAcquireShared方法,线程首先通过tryAcquireShared方法在共享模式下获取锁,如果获取成功就直接返回,否则执行以下步骤:

[java] view plain copy

 print?

/** 

 * Acquires in shared uninterruptible mode. 

 * @param arg the acquire argument 

 */  

private void doAcquireShared(int arg) {  

    final Node node = addWaiter(Node.SHARED);  

    boolean failed = true;  

    try {  

        boolean interrupted = false;  

        for (;;) {  

            final Node p = node.predecessor();  

            if (p == head) {  

                int r = tryAcquireShared(arg);  

                if (r >= 0) {  

                    setHeadAndPropagate(node, r);  

                    p.next = null; // help GC  

                    if (interrupted)  

                        selfInterrupt();  

                    failed = false;  

                    return;  

                }  

            }  

            if (shouldParkAfterFailedAcquire(p, node) &&  

                parkAndCheckInterrupt())  

                interrupted = true;  

        }  

    } finally {  

        if (failed)  

            cancelAcquire(node);  

    }  

}  

1、创建一个新结点(共享模式),加入到队尾,这个过程和独占模式一样,不再重复;
2、判断新结点的前趋结点是否为头结点,如果不是头结点,就将前趋结点的状态标志位设置为SIGNAL,当前线程可以安全地挂起,整个过程结束;
3、如果它的前趋是头结点,就让前趋在共享模式下获取锁,如果获取成功,把当前结点设置为头结点;
4、设置为头结点之后,满足释放锁条件就阻塞等待释放锁。
满足释放锁的条件为:允许传播或者需要通知继任结点,或者继任结点是共享模式的结点

[java] view plain copy

 print?

if (propagate > 0 || h == null || h.waitStatus < 0) {  

          Node s = node.next;  

          if (s == null || s.isShared())  

              doReleaseShared();  

      }  

共享模式下释放锁

这是通过方法releaseShared来实现的,整个流程如下:
1、调用子类的tryReleaseShared尝试获取锁,如果失败,直接返回;
2、如果成功调用doReleaseShared方法做后续处理,doReleaseShared方法如下:

[java] view plain copy

 print?

/** 

    * Release action for shared mode -- signal successor and ensure 

    * propagation. (Note: For exclusive mode, release just amounts 

    * to calling unparkSuccessor of head if it needs signal.) 

    */  

   private void doReleaseShared() {  

       /* 

        * Ensure that a release propagates, even if there are other 

        * in-progress acquires/releases.  This proceeds in the usual 

        * way of trying to unparkSuccessor of head if it needs 

        * signal. But if it does not, status is set to PROPAGATE to 

        * ensure that upon release, propagation continues. 

        * Additionally, we must loop in case a new node is added 

        * while we are doing this. Also, unlike other uses of 

        * unparkSuccessor, we need to know if CAS to reset status 

        * fails, if so rechecking. 

        */  

       for (;;) {  

           Node h = head;  

           if (h != null && h != tail) {  

               int ws = h.waitStatus;  

               if (ws == Node.SIGNAL) {  

                   if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  

                       continue;            // loop to recheck cases  

                   unparkSuccessor(h);  

               }  

               else if (ws == 0 &&  

                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  

                   continue;                // loop on failed CAS  

           }  

           if (h == head)                   // loop if head changed  

               break;  

       }  

   }  

这个方法就一个目的,就是把当前结点设置为SIGNAL或者PROPAGATE,如果当前结点不是头结点也不是尾结点,先判断当前结点的状态位是否为SIGNAL,如果是就设置为0,因为共享模式下更多使用PROPAGATE来传播,SIGNAL会被经过两步改为PROPAGATE:

compareAndSetWaitStatus(h, Node.SIGNAL, 0)

compareAndSetWaitStatus(h, 0, Node.PROPAGATE)

为什么要经过两步呢?原因在unparkSuccessor方法:

[java] view plain copy

 print?

private void unparkSuccessor(Node node) {  

    int ws = node.waitStatus;  

    if (ws < 0)  

        compareAndSetWaitStatus(node, ws, 0);  

        ......  

}  

如果直接从SIGNAL到PROPAGATE,那么到unparkSuccessor方法里面又被设置为0:SIGNAL--PROPAGATE---0----PROPAGATE
对头结点相当于多做了一次compareAndSet操作,其实性能也殊途同归啦!

闭锁(CountDownLatch)

闭锁是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
闭锁有几个重要的方法:

[java] view plain copy

 print?

public void await() throws InterruptedException;  

public void countDown();  

其中await方法使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断,如果锁存器为0方法立即返回,一开始锁存器不会为0,当调用countDown方法之后锁存器会减少,当锁存器减少到0时,await方法就会返回。现在看看await方法的实现:

[java] view plain copy

 print?

public void await() throws InterruptedException {  

    sync.acquireSharedInterruptibly(1);  

}  

不出所料,闭锁的await方法正是使用的共享模式的AQS,acquireSharedInterruptibly和acquireShared方法类似,只不过会先响应中断。也就是当有多个线程调用await方法时,这些线程都被阻塞到了doAcquireShared方法的以下地方:

[java] view plain copy

 print?

if (shouldParkAfterFailedAcquire(p, node) &&  

                   parkAndCheckInterrupt())  

                   interrupted = true;  

前面看到doAcquireShared里面有一个for循环,退出for循环的唯一方式是要tryAcquireShared方法返回值大于0,下面看看tryAcquireShared的方法在闭锁中的实现:

[java] view plain copy

 print?

public class CountDownLatch {  

    private static final class Sync extends AbstractQueuedSynchronizer  {  

        Sync(int count) {  

            setState(count);  

        }  

        ......  

    }  

      

    private final Sync sync;  

          

    protected int tryAcquireShared(int acquires) {  

        return (getState() == 0) ? 1 : -1;  

    }  

    ......  

}  

count代表是的线程数,在创建闭锁的同步器时这个count值被赋给了state,因此state肯定不为0,所以tryAcquireShared方法肯定返回-1,也就是这些线程调用await方法时tryAcquireShared都返回-1,这些线程都会阻塞在doAcquireShared的for循环里。然后这些线程依次调用countDown方法,直到最后一个线程调用完后这些线程才会退出for循环继续执行。下面看看countDown方法的实现过程:

[java] view plain copy

 print?

public void countDown() {  

    sync.releaseShared(1);  

}  

  

//sync.releaseShared  

public final boolean releaseShared(int arg) {  

    if (tryReleaseShared(arg)) {  

        doReleaseShared();  

        return true;  

    }  

    return false;  

}  

仍然不出所料,countDown方法正是调用的releaseShared方法,前面提到releaseShared会先调用tryReleaseShared方法,这是由闭锁实现的:

[java] view plain copy

 print?

protected boolean tryReleaseShared(int releases) {  

    // Decrement count; signal when transition to zero  

    for (;;) {  

        int c = getState();  

        if (c == 0)  

            return false;  

        int nextc = c-1;  

        if (compareAndSetState(c, nextc))  

            return nextc == 0;  

    }  

}  

该方法会递减state的值,直到变为0返回false.
现在整个闭锁的执行流程很明确了:N个线程调用await阻塞在for循环里面,然后N个线程依次调用countDown,每调用一次state减1,直接state为0,这些线程退出for循环(解除阻塞)!
退出for循环时,由于头结点状态标志位为PROPAGATE,而且这些结点都是共享模式,由头结点一传播,这些结点都获取锁,于是齐头并进执行了......
共享与独占在读写锁里面也有用到,后面再分析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: