您的位置:首页 > 产品设计 > UI/UE

AbstractQueuedSynchronizer实现Android版源码解析(一)

2014-06-30 16:05 525 查看
由于Android版的AbstractQueuedSynchronizer实现与JDK版本稍有不同,这里主要针对Android版实现进行解析。

AbstractQueuedSynchronizer简介

AbstractQueuedSynchronizer(AQS)是实现依赖于FIFO等待队列的阻塞锁定或者相关同步器(ReentrantLock, Semaphore等)的一个框架,在java.util.concurrent并发库中有着举足轻重的作用。AQS把单个原子int值来表示状态(对于long有专属类AbstractQueuedLongSynchronizer),一般子类通过继承该类后,定义哪种状态属于当前对象被获取或者被释放,一般使用getState()、setState(int)、compareAndSetState(int,
int)方法来更新int值。

AQS支持独占(exclusive)模式或者共享(shared)模式。在独占模式下,只有指定线程可以成功获得锁。在共享模式下,多个线程可以获取锁成功。

AQS内部定义一个嵌套类ConditionObject,可以用做Condition实现。

一般情况下,子类为来实现对应的同步需求,可以重载以下方法(具体原理稍后解释)

tryAcquire(int) 试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。

tryRelease(int) 试图设置状态来反映独占模式下的一个释放。

tryAcquireShared(int) 试图在共享模式下获取对象状态。此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它。

tryReleaseShared(int) 试图设置状态来反映共享模式下的一个释放。

isHeldExclusively() 如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回 true。

具体实现解析

1. AQS等待队列原理

AQS内部以一个双向链表队列作为线程等待队列(CLH锁队列变种),每个等待这把锁的线程作为链表的一个结点。每个结点包含一个“status”域来跟踪每条线程是否需要阻塞。当祖先被释放的时候,这个结点会被触发。在等待队列中的处于第一个的线程可以尝试获取锁,但处于第一不保证一定成功获取,只代表它有权去竞争,所以当前释放的竞争者线程可能需要重新等待。

作为一个FIFO等待队列的实现,唤醒结点从head开始,插入结点从tail开始。

//等待队列结构如下:
+------+  prev +-----+       +-----+
head | dummy| <---- |     | <---- |     |  tail
| node |  next |     |       |     |
+------+ ----> +-----+ ----> +-----+


head一直只会引用一个无效空结点,其后继结点才是第一个表示等待该锁的线程结点。

prev主要用在取消获取锁操作上。如果一个结点被取消来,他的后继结点一般会被重新连接到一个非取消的祖先。

next用来实现阻塞机制。祖先通过遍历next连接触发下一个结点去决定被唤醒的线程。后继结点的查找必须避免与新进入队列的结点设置next域竞争。如果出现后继结点为null的情况下,可以通过从tail开始从后往前的查询真正的继承者解决竞争。(或者,next连接可以看到避免经常从后往前的扫描的优化)。

CLH队列需要一个无效的头结点作为初始化。不过实现里并没有在构造时刻创建,因为可能如果没有产生竞争的时候,这个无效头结点就浪费。因此是在第一次竞争的时候创建这个无效头结点。

结点状态有以下几种

static final int CANCELLED = 1;

static final int SIGNAL = -1;

static final int CONDITION = -2;

static final int PROPAGATE = -3;

另外还包括默认值0。

SIGNAL 该结点的后继者已经(或者很快就会)被阻塞,所以当前结点必须在释放或者取消的时候唤醒它的后继结点。

CANCELLED 当前结点由于超时或者interrupt被取消,一经设置,结点无法改变此状态。换句话说,该线程不会再次被阻塞。

CONDITION 结点目前在结点队列里。当重新进入等待队列后,这个状态会被设置为0。

PROGPAGATE 一个releaseShared应该被传递给其他结点。在doReleaseShared里设置(只针对头结点)去确保传递会继续。

这些状态值里,非负数表示一个结点不需要被触发。所以,大部分代码值需要简单测试正负即可。

在条件状态下的线程使用相同结点表达,不过使用里额外的连接。由于条件状态只能在独占模式下访问,因此只需要简单的结点连接(不存在并发)。在等待时候,一个结点会插入到条件队列,在触发的时候,这个结点会把转移到等待队列,一个特别的状态值(CONDITION)会被设置去标记这个结点在哪个队列上。

2.源码剖析

AQS作为java并发的重中之中,内部实现相当巧妙,但千里之行始于足下,我们先从独占模式开始,从acquire入手剖析如果多个线程同时争抢锁的时候,争抢失败的线程如何进入队列等待锁。

(1)acquire获取锁流程

首先,获取锁一般是acquire函数,不过AQS提供了acquire的多个变种,其中包括(仅限独占模式)

//在独占模式下获取锁,忽略线程interrupted
public final void acquire(int arg)

//在独占模式下获取锁,如果线程interrupted,抛出异常,放弃等待锁。
public final void acquireInterruptibly(int arg)

//尝试在独占模式下获取锁,如果线程interrupted,抛出异常,同时等待超过指定时间时,会返回失败
public final boolean tryAcquireNanos(int arg, long nanosTimeout)

事实上,三种变种的具体实现结构差不多,因此我们先集中剖析acquire获取。函数源码如下。

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire由子类重新实现逻辑,一般子类可以通过判断当前是否已经有线程获取了锁,或者判断当前线程是否已经获得了锁(可重入)等等,通过判断当前线程能否获得锁,返回了true,证明当前线程获取了锁,也就不需要竞争,会直接退出函数,如果返回了false,则需要把当前线程加入等待队列。

加入等待队列是通过addWaiter实现,函数源码如下

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

这里,我们赋予参数mode为Node.EXCLUSIVE,查看定义是一个null的Node,表达这是在独占模式。(在共享模式下,这是一个经过初始化的final空结点,用于区分独占结点)。把当前线程以及模式作为参数构造一个新的Node之后,首先尝试从tail结点插入,如果tail结点没有初始化,或者产生了竞争的时候,就会调用enq函数利用循环CAS来保证从tail插入结点。

enq函数实现如下

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//head,tail初始化后:
+------+
head | dummy| tail
| node |
+------+

//添加第一个结点:
+------+  prev +-----+
head | dummy| <---- |     | tail
| node |  next |     |
+------+ ----> +-----+
这是一个循环的CAS,循环内部会检查tail,由于这里是产生竞争才会执行,所以tail为null即head和tail还没有进行初始化,于是把tail和head引用一个无效的空结点。当初始化完成后,就是典型的CAS把当前结点插入到tail,成功就会返回到上一步。注意,这时head引用的还是无效空结点,tail指向实际的结点。

addWaiter把当前线程入队后,就会到acquireQueued函数,代码如下

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

首先该函数主体也是一个循环和条件判断来避免竞争带来的问题。在循环内部,首先获取调用函数predecessor获取祖先结点,函数实现里有一个null判断prev,会抛出NullPointerException(注释作用为help the VM,个人感觉方便从异常里查找问题)。然后判断p == head是由于头结点的直接后继才是第一个能够重新竞争的线程,如果是头结点的直接后继结点,这里再次重新判断tryAcquire来尝试获取锁,如果成功的话,则会调用setHead把head指向当前结点,然后返回。如果tryAcquire失败,则会通过shouldParkAfterFailedAcquire把当前线程的waitStatus的状态标识设置为SIGNAL,然后返回false,再重新尝试获取锁,如果再次失败,则会进入parkAndCheckInerrupt,把当前线程进入阻塞状态,直到release把当前线程唤醒,然后再次循环进行tryAcquire获取锁,除非成功退出或者失败再次进入阻塞,这样做可以避免多线程竞争锁带来的问题。

先看看shouldParkAfterFailedAcquire实现。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
传入的参数是当前结点以及其前继结点。由于默认的时候waitStatus为0,因此第一次调用的时候会把waitStatus设置为SIGNAL,如果waitStatus大于0(CANCELLED状态),则会迭代查找前继结点。返回true,则会在往后的操作里进行阻塞状态,如果返回false则会进入上一步的循环重新尝试获取锁。因此这个函数主要在更改前继结点状态到SIGNAL。这样做的目的,对比其更改到SIGNAL后直接返回true,是为了避免多线程竞争的问题,这个竞争与下面释放锁有关,详细例子见下面释放锁release的剖析。

如果shouldParkAfterFailedAcquire返回true,则会执行parkAndCheckInterrupt,代码如下

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

LockSupport.park会把当前线程进行阻塞状态。同时返回当前线程是否被interrupt。

另外,我们看看如果发生了异常退出循环,需要取消正在获取锁的行为时,调用了cancelAcquire。

private void cancelAcquire(Node node) {
if (node == null)
return;

node.thread = null;

Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

Node predNext = pred.next;

node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {

int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

首先,如果node为null则不做任何事情直接return。然后,通过while循环判断,直接前继结点的waitStatus > 0(也就是CANCELLED)来跳过已经被取消来的前继结点,然后predNext显然就是需要取消链接的结点(通过CAS替换next域),但由于多线程竞争存在,因此下面的CAS更改可能会失败,也就是说在与另外的cancel或者signal操作中竞争失败,因此可以不需要另外的操作也不会影响。给node.waitStatus
= Node.CANCELLED可以不需要CAS,这样其它结点在判断中就会跳过该结点。

接下来判断,如果被取消的结点刚好是tail结点,并且CAS可以成功把tail结点替换成非取消的前继结点pred,则同时利用CAS把pred的next域替换为null。如果被取消的结点不是tail结点,则会再次判断,如果该结点并非头结点的直接后继(因为可能需要被唤醒)并且pred的后继结点需要唤醒(也就waitStatus为SIGNAL或者为<=0),这样就把被取消结点的next域赋给非取消前继结点pred的next域。否则的话我们便唤醒被取消结点node的后继结点,尝试获取锁。

(2)release释放锁流程

这样大致了解来acquire获取锁的流程,我们来看看release释放锁究竟做了什么。

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

首先判断tryRelease是否允许释放锁,一般子类重写tryRelease会根据需要判断能否释放当前锁中的等待队列中的其他结点。当tryRelease返回true的时候,则会判断head结点是否为null并且head的waitStatus是否为非0,如果判断成功,则会调用unparkSuccessor来释放后继结点。

注意,这里release锁便与上面的获取锁先更改SIGANL,然后再返回循环重新获取锁的做法有竞争问题,我们先假设shouldParkAfterFailedAcquire函数更改了SIGNAL状态后直接返回true,把当前线程进入阻塞,即

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
...
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

//更改了SIGNAL之后返回true,便把当前线程进入阻塞状态
return true;
...
}
那么当我们在获取锁的时候竞争失败,进入了shouldParkAfterFailedAcquire之后,此时,release函数被调用,判断waitStatus由于仍然没改变SIGNAL,h.waitStatus == 0,然后就会退出函数,但此时shouldParkAfterFailedAcquire仍然执行,把当前线程阻塞了,这样,就会由于并发问题进入死锁状态。因此,shouldParkAfterFailedAcquire采取的做法是先将状态更改为SIGNAL,然后返回false,这样返回上一个堆栈里的循环,就会还有一次tryAcquire的机会,这时当前线程便可以成功获取了锁,避免了死锁状态。

我们回到release函数,判断成功后,执行unparkSuccessor,代码如下。

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

函数首先判断waitStatus,如果为负,则变为0,表明准备唤醒后继结点,然后一般情况下后继结点就是next域,但由于竞争存在,因此可能被cancelled或者为null,因此需要从tail到head进行遍历找出真正的没有被cancelled的后继结点。最后则调用LockSupport.unpark把线程唤醒,被唤醒的线程就会在acquireQueued的循环里再次重新尝试获取锁,这样在循环里重新获取锁的时候,由于有可能此时有外部线程调用acquire获取锁,这样两者竞争,如果失败的话,则失败的一方就会进入阻塞状态。

这样一来,acquire获取锁和release释放锁的流程大致清楚了,接下来看看其他变种的实现。

(3)acquire的其它变种。

首先看看acquireInterruptibly变种。

public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看出,在acquireInterruptibly的开始添加了Thread.interrupted判断后,抛出异常,以及在parkAndCheckInterrupt里返回true抛出异常外,大体与acquire没有区别。

我们再来看看tryAcquireNanos变种。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireNanos里同样多了一个Thread.interrupted的判断,而doAcquireNanos则稍微有点不一样,除去一些超时的判断之后,一个很关键的判断nanosTimeout > spinForTimeoutThreshold,按照注释,spinForTimeoutThreshold是一个大致的阀值,当小于这个阀值的时候,旋转一次的时间比parkNanos会比更加快,此时选择旋转的话,会提高非常短时间的反应性,也就是说会更加精确达到超时控制。

这样,本次代码剖析第一步就完结了,在这里,我们先大致介绍了AQS的框架以及实现原理,然后从代码上剖析了获取锁和释放锁的具体实现原理,从实现上可以看到对于多线程并发要注意的竞争问题。下一步将会进行关于共享模式的获取锁和释放锁的分析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: