您的位置:首页 > 编程语言 > Java开发

Jdk1.8版本并发包基类AQS(AbstractQueuedSynchronizer)实现原理分析

2018-01-26 23:14 573 查看

一、AbstractQueuedSynchronizer是干嘛的?

       AbstractQueuedSynchronizer(以下简称AQS)是理解JAVA并发包里面各种类使用的一个重要的基础抽象基类,最近也在网上看了一些博客专门讲这个,感觉有些地方没有说清楚,也有一些笔误或者理解的问题,所以自己决定也写一篇。做个笔记,方便以后查阅,如果理解有误的地方,欢迎大家批评指正。

      那么,AQS是干嘛的呢,如果经常使用java并发包的朋友应该会有所了解。AQS是一个抽象的基类,它内部通过一个FIFO的同步等待队列 + 一个volatile修饰的state来管理节点等待状态等,整个类采用模板模式实现,提供一些方法供子类实现,支持互斥和共享2种模式。

      本文基于Jdk1.8版本(虽然jdk1.9已经发布了,但目前主流的使用版本应该还是1.7或者1.8)。

   

二、同步等待队列

      在AQS里面,同步等待队列叫做CLH(Craig, Landin, and Hagersten);当线程要入队的时候,只需要构建成一个Node节点加入队尾即可,如果要出队,只需要将队首节点移除,同时唤醒下一个等待的节点。这个CLH队列可以看成是一个链表实现的,

       刚开始队列里面只有一个空的Head节点,这个时候Head节点和Tail节点都是同一个节点,这个节点是虚拟的,当有线程没有获取的锁对象的时候,那么就需要构建一个Node节点往这个CLH队列里面加,同时更新节点之间的引用关系(主要就是链表的东西),然后加入到队列里面后,一直自旋去尝试获取同步等待状态,当然,在队列里自旋的时候,还有其他很多细节,接下来我们先来分析这个Node节点的定义。

    

static final class Node {

/** Marker to indicate a node is waiting in shared mode */
//表示一个共享节点模式下等待的常量
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
//表示在互斥模式下等待的常量
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
//表示当前节点线程被取消
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
//表示当前节点线程等待被唤醒
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
//表示当前节点线程正在等待某个条件
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
//表示下一次共享模式的同步等待状态的获取将会一直循环下去
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
*   SIGNAL:     The successor of this node is (or will soon be)
*               blocked (via park), so the current node must
*               unpark its successor when it releases or
*               cancels. To avoid races, acquire methods must
*               first indicate they need a signal,
*               then retry the atomic acquire, and then,
*               on failure, block.
*   CANCELLED:  This node is cancelled due to timeout or interrupt.
*               Nodes never leave this state. In particular,
*               a thread with cancelled node never again blocks.
*   CONDITION:  This node is currently on a condition queue.
*               It will not be used as a sync queue node
*               until transferred, at which time the status
*               will be set to 0. (Use of this value here has
*               nothing to do with the other uses of the
*               field, but simplifies mechanics.)
*   PROPAGATE:  A releaseShared should be propagated to other
*               nodes. This is set (for head node only) in
*               doReleaseShared to ensure propagation
*               continues, even if other operations have
*               since intervened.
*   0:          None of the above
*
* The values are arran
fd6c
ged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes.  It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing.  Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
//当前节点的前一个节点
volatile Node prev;

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued.  The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check.  The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
//当前节点的后一个节点
volatile Node next;

/**
* The thread that enqueued this node.  Initialized on
* construction and nulled out after use.
*/
//获取同步等待状态的当前线程
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED.  Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/

//当前节点的下一个处于等待状态的节点
Node nextWaiter;

/**
* Returns true if node is waiting in shared mode.
*/
 final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null.  The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
//找到当前节点的前一个节点
 final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() {    // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) {     // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

     同步等待相关的节点定义

   

/**
* Head of the wait queue, lazily initialized.  Except for
* initialization, it is modified only via method setHead.  Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
//头节点
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized.  Modified only via
* method enq to add new wait node.
*/
//尾节点
private transient volatile Node tail;

/**
* The synchronization state.
*/
// volatile修饰的int类型的同步等待状态
 private volatile int state;


      AQS主要就靠这个Node节点来维护同步等待队列,接来下我们分互斥模式访问和共享模式访问来分别看,AQS是如何实现的。

三、互斥模式

        在互斥模式下,请求去获取一个锁的方法如下(因为还提供了支持中断和超时的实现,所以这里先以不支持超时和中断的这个方法作为入口)

       

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


      acquire方法,首先会调用tryAcquire,这是一个空的方法,具体实现交给子类自己去实现,如果tryAcquire方法返回true,说明这个线程获取了锁,那么这里就不处理,如果返回了false,会分为以下几步:

     1、addWaiter 方法构建一个互斥模式的节点Node,将其加入这个CLH队列。

    2、acquireQueued方法,当节点入队后,会一直自旋,acquireQueued方法主要用来处理这个自旋节点什么时候该被唤醒,是否该被中断等

    下面来看一下具体的源码,先看addWaiter是如何构建Node节点入队的

    

private Node addWaiter(Node mode) {
//构造一个节点,第一个参数为当前线程,第二个参数为节点类型
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//首先尝试快速入队尾,如果失败了再说,这里是将队尾节点tail赋值为一个新节点pred
Node pred = tail;
if (pred != null) {
//如果快速入队成功了,那么就将当前节点的前一个节点设置为原来的那个队尾节点
node.prev = pred;
//CAS将当前节点设置为队尾节点
if (compareAndSetTail(pred, node)) {
//原来的队尾节点的下一个节点就是当前这个入队节点,也就是当前入队节点现在是队尾节点
pred.next = node;
return node;
}
}
//如果快速入队失败了,那么实行正常入队
enq(node);
return node;
}

     当快速入队失败了,那么就自旋去进行正常入队

    

private Node enq(final Node node) {
/**一直自旋,相当于一个死循环 */
for (;;) {
Node t = tail;
//如果目前队列里面还没有任何节点,那么需要new一个哑铃节点来充当队列头节点,此时整个队列里面
//只会有一个虚拟的head,tail节点,这2个节点指向的是同一个节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//将当前节点的前驱节点指向同步等待队列的尾节点。
node.prev = t;
//CAS设置队列尾节点为当前节点
if (compareAndSetTail(t, node)) {
//设置成功后,以前的队尾节点的下一个节点就指向了当前入队的节点,这时入队成功,然后返回
t.next = node;
return t;
}
}
}
}


     入队成功后,进行自旋等待,看什么时候,自己可以被唤醒去尝试获取锁

    

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//刚刚加入到CLH队列的当前线程是进行自旋,去获取锁
for (;;) {
//获取当前节点的上一个节点
final Node p = node.predecessor();
//如果上一个节点是头节点,说明队列里面只有自己在排队等待获取锁,那么就直接尝试去获取锁
if (p == head && tryAcquire(arg)) {
//如果获取锁成功,将当前节点设置为头节点
setHead(node);
//原本的头节点的下一个节点就不存在了,设置为null,便于gc
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果上面返回失败了,那么首先看一下当前节点是否需要被阻塞,并检查中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

        如果这个入队的节点,刚开始不被允许取获取这个等待状态,那么就需要看一下这个节点是否应该被阻塞。什么意思呢,就是加入这个节点进入队列的时候,他前面已经有其他节点排着队了,根据FIFO的公平性原则,你这个节点必须得排队等待,在这里的话也就是说会被阻塞住,就没有必要一直去自旋尝试,只需要等待它的前一个节点来唤醒你就好了。
      

       现在来看一下 shouldParkAfterFailedAcquire,该方法主要是判断当前节点线程是否应该被阻塞

       

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱节点的等待状态
int ws = pred.waitStatus;
//如果前驱节点的等待状态是SIGNAL,表明前驱节点也在等待被唤醒,根据队列公平性原则,当前节点必须排队,所以当前节点需要被阻塞,这里就返回了true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果当前节点的前驱节点的等待状态为 取消状态(可能前驱节点被中断或者超时了)
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 因为前驱节点已经被取消了,那么需要重新规划当前节点的前驱节点的引用,也就是当前节点的前驱节点是已经被

//取消的这个前驱节点的前驱节点 Node <---- preNod <-----Node
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE.  Indicate that we
* need a signal, but don't park yet.  Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//其他情况CAS设置当前节点的前驱节点等待状态为SIGNAL,返回false
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

     如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程

    

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}


    回到acquire方法,最后如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回true,说明当前线程被中断,会继续调用selfInterrupt方法

    

static void selfInterrupt() {
//中断当前线程
Thread.currentThread().interrupt();
}

 

     当获取锁后,业务处理完后,就需要释放同步等待状态,会调用下面的方法来释放锁

    

public final boolean release(int arg) {
//首先尝试去释放等待状态,tryRelease同样交给子类自己去个性化实现
if (tryRelease(arg)) {
Node h = head;
//如果head不为null且,该节点的等待状态不为0,那么就需要唤醒它的后续节点,让后续节点去获取锁
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}


      当某个节点业务处理完后释放锁之后,需要唤醒它的后继节点,如果后继节点有节点已经被中断或者超时取消了,需要继续找到一个可用的节点唤醒

    

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling.  It is OK if this
* fails or if status is changed by waiting thread.
*/
//获取当前节点的等待状态
int ws = node.waitStatus;
if (ws < 0)
//如果节点等待状态小于0,那么就设置为0
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node.  But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
//如果当前节点的下一个节点为null或者说下一个节点的等待状态>0(也就是下一个节点被取消或者中断了),那么需要反向往前找到一个节点不为null且状态不大于0的节点,将它进行唤醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒节点线程
LockSupport.unpark(s.thread);
}

      以上就是互斥模型下基本类型的获取锁,释放锁操作,接下来我们再来看一下支持中断和超时的互斥模式实现。

 三、支持响应中断的互斥模式

 

     支持响应中断的意思就是,在CLH队列里等待的节点,可以被中断

public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}


    当tryAcquire返回false,也就是该线程节点没有获取到锁,接着调用 doAcquireInterruptibly方法

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//构建节点入队
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
//自旋
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
//入队后自旋尝试去获取或,如果该节点应该被中断,那么直接抛异常(这点的处理和刚刚上面的处理不一样,上面是加了一个中断的标志)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

     可以看到,和最基本的互斥模式实现差不多,至少支持中断的请求,如果判断了该线程节点需要阻塞的话,直接抛一个异常出去。

四、支持超时时间设置的独占模式

      当执行tryAcquire的时候,可以传入一个超时时间,当这个节点线程没有获取到锁而在CLH队列里面排队的时候,如果指定时间内还没有获取锁,那么直接返回,不用再队列里再等待。

    

    

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}


      如果没有获取到锁,也就是tryAcquire返回了false,则进入下面的方法

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//如果超时时间小于0,直接返回false
if (nanosTimeout <= 0L)
return false;
//赋值 超时时间
final long deadline = System.nanoTime() + nanosTimeout;
//构建节点入队
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//在自旋的过程中,不断校验剩余的超时时间
nanosTimeout = deadline - System.nanoTime();
//如果自旋的过程中已经超时了,直接返回false
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//如果已经被中断,直接抛异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

    至此,AQS里面基于互斥模式的获取锁、释放锁操作的源码实现大概流程分析完了。而对于共享模式,其实和互斥模式差不多,下面我们来简单看一下共享模式的AQS实现

   

五、共享模式

 

    先看一下共享模式下的获取锁方法

    

public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}


     如果(tryAcquireShared返回小于0表示请求成功(该方法同样交给了子类去实现),如果没有成功则继续调用doAcquireShared,看一下调用失败后的处理方法doAcquireShared

   

     

/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 将当前线程以共享模式加入同步等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//主循环
for (;;) {
//获取当前线程的前一个节点
final Node p = node.predecessor();
//如果p是头节点,那么继续调用tryAcquireShared尝试去获取锁
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//p节点被移除,置空next引用,帮助GC
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//如果当前节点的前驱节点不是头节点,判断当前节点请求失败后是否要被阻塞,如果是,阻塞并保存当前线程中断状态。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}


       共享模式也有支持中断和超时的方法,和互斥模式实现基本一样,这里就不重复分析了。

       接下来看一下共享模式下的释放操作

   

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases.  This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
//判断同步等待队列是否为空
if (h != null && h != tail) {
//如果不为空,获取头节点的等待状态。
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//如果等待状态是SIGNAL,说明其后继节点需要唤醒
//尝试修改等待状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;            //如果修改失败,重新循环检测。
unparkSuccessor(h);//如果修改成功,唤醒头节点的后继节点。
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果等待状态是0,尝试将其(头节点)设置为PROPAGATE
continue;                // 如果设置失败,继续循环检测。
}
if (h == head)                   // 如果过程中头节点没有发生变化,循环退出;否则需要继续检测。
break;
}
}


      共享模式的也分析到这,AQS里面还提供了一些检测的方法,比如判断同步等待队列中是否有线程在等待等,都比较简单。    如果读者感兴趣,可自行查看源码或加qq交流。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: