您的位置:首页 > 产品设计 > UI/UE

AQS(AbstractQueuedSynchronizer)

2018-12-15 22:36 113 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_18515155/article/details/89346117

AQS(AbstractQueuedSynchronizer)

目录

  • release(int)

    一、概述

    AQS是AbstractQueuedSynchronizer(抽象队列同步器)的缩写。它是多线程访问共享资源的框架,ReentrantLock、CountDownLatch、Semaphore等都是基于它来实现的。


    从图中可以看到,有两个关键的组成部分,一个是state(共享资源,也可以理解为资源占用计数器),另一个是FIFO队列,用来保存需要获得共享资源的县城,其head节点始终指向当前真在占用共享资源的线程。

    进入等待队列的线程会被封装成一个Node。其主要成员如下:

    class Node {
    //在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
    static final int CANCELLED =  1;
    //值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
    static final int SIGNAL    = -1;
    //与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
    static final int CONDITION = -2;
    //与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
    static final int PROPAGATE = -3;
    //节点的等待状态,默认0状态:值为0,代表初始化状态。
    volatile int waitStatus;
    //前驱结点
    volatile Node prev;
    //后驱节点
    volatile Node next;
    //目标线程
    volatile Thread thread;
    //获取前驱结点
    final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
    throw new NullPointerException();
    else
    return p;
    }
    Node(Thread thread, Node mode) {
    this.nextWaiter = mode;
    this.thread = thread;
    }
    }

    对于共享资源state的修改,除了提供普通的getter之外,还提供了一个原子操作compareAndSetState()。

    AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

    AQS提供了几个重要的方法,参数都是state的值:

    • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    自定义同步器主要实现这些方法即可,其他的工作AQS本身已经实现好了。

    以ReentrantLock为例,state初始化为0,表示资源/锁未被占用。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是==可重入==的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

    二、AQS工作原理

    今天主要分析独占式下的acquire-release。

    acquire(int)

    1 public final void acquire(int arg) {
    2     if (!tryAcquire(arg) &&
    3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4         selfInterrupt();
    5 }
    [/code]

    函数步骤如下:

    1. tryAcquire(arg)。获取共享资源。
    2. addWaiter(Node.EXCLUSIVE)。将需要获取共享资源的线程放入等待队列的尾部,并标记为独占模式。
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))。进入等待队列里再次尝试获取共享资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
    4. 如果如果直接获取失败或者在等待队列里被中断过则执行selfInterrupt(),调用Thread.currentThread().interrupt()来中断线程。

    下面逐个方法看。

    tryAcquire(int)

    1     protected boolean tryAcquire(int arg) {
    2         throw new UnsupportedOperationException();
    3     }
    [/code]

    该方法的实现仅仅是抛出一个异常。然而该方法正是需要自定义同步器重写的方法,包括对state的操作。

    addWaiter(Node)

    由于直接获取资源失败,该方法是将线程放到等待队列尾部。

    private Node addWaiter(Node mode) {
    //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);
    
    //尝试快速方式直接放到队尾。
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    
    //上一步失败则通过enq入队。
    enq(node);
    return node;
    }
    [/code]

    其中compareAndSetTail(pred, node)是以原子的方式进行尾节点和当前线程节点的交换。

    enq(Node)

    该方法是在快速加入尾节点失败之后执行,目的也是将node加入队尾。

    1 private Node enq(final Node node) {
    2     //"自旋",直到成功加入队尾
    3     for (;;) {
    4         Node t = tail;
    5         if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
    6             if (compareAndSetHead(new Node()))//原子设置头节点
    7                 tail = head;
    8         } else {//正常流程,放入队尾
    9             node.prev = t;
    10             if (compareAndSetTail(t, node)) {
    11                 t.next = node;
    12                 return t;
    13             }
    14         }
    15     }
    16 }
    [/code]

    acquireQueued(Node, int)

    通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部“休息”,直到其他线程彻底释放资源后唤醒自己,再去获取共享资源。

     1 final boolean acquireQueued(final Node node, int arg) {
    2     boolean failed = true;//标记是否成功拿到资源
    3     try {
    4         boolean interrupted = false;//标记等待过程中是否被中断过
    5
    6         //又是一个“自旋”!
    7         for (;;) {
    8             final Node p = node.predecessor();//拿到前驱
    9             //如果前驱是head,则可以去获取资源
    10             if (p == head && tryAcquire(arg)) {
    11                 setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
    12                 p.next = null; // setHead中node.prev已置为null,此处再将原来的head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
    13                 failed = false;
    14                 return interrupted;//返回等待过程中是否被中断过
    15             }
    16
    17             //判断是否可以休息,如果可以,就进入waiting状态,如果等待过程中被中断过,就将interrupted标记为true
    18             if (shouldParkAfterFailedAcquire(p, node) &&
    19                 parkAndCheckInterrupt())
    20                 interrupted = true;
    21         }
    22     } finally {//自旋过程中超时或者被中断则从队列移除该节点
    23         if (failed)
    24             cancelAcquire(node);
    25     }
    26 }
    [/code]

    shouldParkAfterFailedAcquire(Node, Node)

    此方法主要用于检查状态,看看自己是否可以进入waiting状态。

     1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    2     int ws = pred.waitStatus;//拿到前驱的等待状态
    3     if (ws == Node.SIGNAL)
    4         //如果已经告诉前驱资源释放后通知自己一下,那就可以安心休息了
    5         return true;
    6     if (ws > 0) {
    7         /*
    8          * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
    9          * 注意:那些放弃的结点稍后就会被回收
    10          */
    11         do {
    12             node.prev = pred = pred.prev;
    13         } while (pred.waitStatus > 0);
    14         pred.next = node;
    15     } else {
    16          //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它资源释放后通知自己一下。
    17         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    18     }
    19     return false;
    20 }
    [/code]

    parkAndCheckInterrupt()

    休眠并且检查中断。

    1 private final boolean parkAndCheckInterrupt() {
    2     LockSupport.park(this);//调用本地方法park()使线程进入waiting状态
    3     return Thread.interrupted();//返回当前线程是否被中断。
    4 }
    [/code]

    cancelAcquire(Node)

    从队列移除节点

    private void cancelAcquire(Node node) {
    if (node == null)
    return;
    
    node.thread = null;
    Node pred = node.prev;
    while (pred.waitStatus > 0)
    node.prev = pred = pred.prev;//跳过已经被取消的节点一直往前找,直到找到一个有效的节点,让node的前驱结点指向该节点
    Node predNext = pred.next;//获取刚才找到的前驱结点的后置节点
    node.waitStatus = Node.CANCELLED;
    if (node == tail && compareAndSetTail(node, pred)) {//如果当前node就是尾节点,就以原子方式把刚才找到的前驱结点设置为新的尾节点
    compareAndSetNext(pred, predNext, null);//以原子的方式将上面设置为新的尾节点的后置节点置为null
    } else {
    int ws;
    if (pred != head &&
    ((ws = pred.waitStatus) == Node.SIGNAL ||
    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
    pred.thread != null) {//前驱结点不是头节点而且成功设置了"信号状态"的之后,就把它的后置节点指向即将要取消的node节点的后置节点
    Node next = node.next;
    if (next != null && next.waitStatus <= 0)
    compareAndSetNext(pred, predNext, next);
    } else {
    unparkSuccessor(node);//否则唤醒下一个需要获取锁的节点
    }
    
    node.next = node;
    }
    }
    [/code]

    当前节点是尾节点:

    当前节点既不是尾节点也不是头节点:

    release(int)

    释放资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

    1 public final boolean release(int arg) {
    2     if (tryRelease(arg)) {
    3         Node h = head;//找到头结点,即当前持有资源的线程对应的节点
    4         if (h != null && h.waitStatus != 0)
    5             unparkSuccessor(h);//唤醒等待队列里的下一个线程
    6         return true;
    7     }
    8     return false;
    9 }
    [/code]

    tryRelease(int)

    此方法尝试去释放指定量的资源。需要自定义同步器自己实现。

    1 protected boolean tryRelease(int arg) {
    2     throw new UnsupportedOperationException();
    3 }

    其实就是将state减去arg,如果已经彻底释放资源(state=0),要返回true,否则返回false。

    unparkSuccessor(Node)

    此方法用于唤醒等待队列中下一个线程。

    1 private void unparkSuccessor(Node node) {
    3     int ws = node.waitStatus;
    4     if (ws < 0)//置0当前线程所在的结点状态。
    5         compareAndSetWaitStatus(node, ws, 0);
    6
    7     Node s = node.next;//找到下一个需要唤醒的结点s
    8     if (s == null || s.waitStatus > 0) {//如果为空或已取消
    9         s = null;
    10         for (Node t = tail; t != null && t != node; t = t.prev)//从队列尾向前找,直到找到下一个距离node最近的有效节点
    11             if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
    12                 s = t;
    13     }
    14     if (s != null)
    15         LockSupport.unpark(s.thread);//唤醒
    16 }
    posted @ 2018-12-15 22:36 徐小贱的blog 阅读(...) 评论(...) 编辑 收藏
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: