您的位置:首页 > 产品设计 > UI/UE

AbstractQueuedSynchronizer(AQS源码解读)

2019-06-12 18:05 465 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/weixin_42054277/article/details/91867537

AbstractQueuedSynchronizer(AQS源码解读)

AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer。

双向链表

head->A->B->C->D

tail->D->C->B->A

参数及代码块


// 获取Unsafe类的实例,用于对内存进行操作(CAS操作)
private static final Unsafe unsafe = Unsafe.getUnsafe();

// 在内存中的偏移量
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
try {
// 获取偏移量
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));

} catch (Exception ex) { throw new Error(ex); }
}
[/code]

AbstractQueuedSynchronizer.Node

Node为内部类,数据结构为双向链表。

compareAndSetState

如果期望值和更新值不一样,则返回false。

    /**
* 比较并且设置状态
* @param expect 期望值
* @param update 更新值
*/
protected final boolean compareAndSetState(int expect, int update) {

// 通过unsafe中的原子方法来设置
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
[/code]

unsafe.compareAndSwap**
方法是基于JNI的原子操作

acquire

    public final void acquire(int arg) {
// 如果当前线程未获取到锁(即被其他线程占有),把当前线程加到队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
[/code]

tryAcquire

子类必须重写

tryAcquire
方法,不然会抛出异常。

    protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
[/code]

addWaiter

放入尾节点

    private Node addWaiter(Node mode) {

// 创建node节点
Node node = new Node(Thread.currentThread(), mode);

// tail(尾节点)
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 如果尾节点设置成功,直接返回创建的node节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
[/code]

enq

    private Node enq(final Node node) {

// 一直循环去获取,直到尾节点设置成功才返回
for (;;) {
// 双向链表的知识
Node t = tail;
// 尾节点为空时,默认初始化头节点=尾节点=空节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// node的前驱节点指向为当前的尾节点
node.prev = t;
// 将尾节点设置成node节点(新的尾节点)
if (compareAndSetTail(t, node)) {
// 当前尾节点的后继节点指向为node节点
t.next = node;
// 返回旧的尾节点
return t;
}
}
}
}
[/code]

acquireQueued

这里有三种状态

  1. pred.waitStatus => 0, 返回 interrupted => false
  2. pred.waitStatus => -1, 返回 interrupted => false
  3. pred.waitStatus => -1, 线程一直被挂起, 直到锁被释放(release), 返回 interrupted => true
    final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;
try {

// 是否中断
boolean interrupted = false;

for (;;) {
// node的前驱节点
final Node p = node.predecessor();

// 当p为头节点,并且获取到锁  【FIFO】先进先出
if (p == head && tryAcquire(arg)) {

// 设置头节点为node
setHead(node);

// 删除p后继节点的引用
p.next = null;
failed = false;
return interrupted;
}

// 线程一直被挂起,直到上面的if成立
// 下面分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 已中断
interrupted = true;
}
} finally {

// 一旦发生异常,则会进入
if (failed)
cancelAcquire(node);
}
}
[/code]

shouldParkAfterFailedAcquire

第一次会进入else,将waitStatus设置为

Node.SIGNAL
-1
,返回
false
,不会执行
parkAndCheckInterrupt

第二次及之后进入直接返回
true
,就会执行
parkAndCheckInterrupt

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

// 默认为0
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE.  Indicate that we
* need a signal, but don't park yet.  Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
[/code]

parkAndCheckInterrupt

为什么要返回当前线程的中断标识呢?因为

LockSupport.park()
会响应线程中断。
即,当线程中断时,无论是
LockSupport.unpark()
还是
Thread.interrupt()
,都会马上执行下面的
return Thread.interrupted()

    private final boolean parkAndCheckInterrupt() {

3ff7
// 阻塞线程
LockSupport.park(this);
// 返回当前线程中断标识
return Thread.interrupted();
}
[/code]

LockSupport

park

    public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
// 设置线程无限阻塞
UNSAFE.park(false, 0L);

// 阻塞时不会执行
setBlocker(t, null);
}
[/code]

setBlocker

设置t线程的

parkBlocker
属性,记录线程是被谁阻塞的。

    private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
[/code]

一个LockSupport的demo

import java.util.concurrent.locks.LockSupport;

public class LockSupportDemo {

public static Object o = new Object();

public static void main(String[] args) throws InterruptedException {
Thread t = new MyThread1();

t.start();

Thread.sleep(4000);
LockSupport.unpark(t);
}

static class MyThread1 extends Thread {

@Override
public void run() {

System.out.println("等待挂起");

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("已挂起");
System.out.println("等待唤醒");
LockSupport.park(o);

System.out.println("已唤醒");
}
}
}
[/code]

输出为

等待挂起
已挂起
等待唤醒
已唤醒

思考?

LockSupport.park()和Object.wait()的区别?

....下次补充

Thread.interrupted()和Thread.isInterrupted()的区别?

....下次补充

cancelAcquire

    private void cancelAcquire(Node node) {

if (node == null)
return;

// 删除挂载的线程
node.thread = null;

Node pred = node.prev;

// 把node的前驱节点指向挂载到没有被CANCELLED的节点上
// 为什么不判断pred不为null呢?因为在enq()方法里将节点插入到队列的时候就已经初始化过了
/**
* private Node enq(final Node node) {
*     for (;;) {
*         Node t = tail;
*         if (t == null) { // 如果为null就初始化
*             if (compareAndSetHead(new Node()))
*                 tail = head;
*         } else {
*             node.prev = t;
*             if (compareAndSetTail(t, node)) {
*                 t.next = node;
*                 return t;
*             }
*         }
*     }
* }
*/
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

Node predNext = pred.next;

// 将node节点的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;

// 如果node是尾节点,将尾节点设置为node的前驱节点
if (node == tail && compareAndSetTail(node, pred)) {

// 将node前驱节点的后继节点指向设置为null,目的是为了切断与node节点的联系
// pred.next设置为null
compareAndSetNext(pred, predNext, null);
} else {

int ws;
// node的前驱节点不是头节点
// 将node的前驱节点的状态设置为Node.SIGNAL,如果已经是Node.SIGNAL则不需要设置
// pred.thread != null 这个是干啥的,疑问??
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {

Node next = node.next;
// node的后继节点不为null,状态不为CANCELLED
if (next != null && next.waitStatus <= 0)

// 将node的前驱节点的后继节点指向设置为node的后继节点
// 断开node节点的前后联系
// APrev - ANext - NodePrev - NodeNext - CPrev - CNext
//                 ||
//      APrev - ANext - CPrev - CNext
compareAndSetNext(pred, predNext, next);
} else {
// node的前驱节点是头节点,唤醒该节点线程
unparkSuccessor(node);
}

// node的后继节点指向设置为node
node.next = node; // help GC
}
}
[/code]

selfInterrupt

线程在等待的过程中被中断,不响应,需要补上中断。

    static void selfInterrupt() {

// 中断当前线程
Thread.currentThread().interrupt();
}
[/code]

release

  1. waitStatus ===>>> 0 默认值
  2. waitStatus ===>>> 1 Node.CANCELLED
  3. waitStatus ===>>> -1 Node.SIGNAL
  4. waitStatus ===>>> -2 Node.CONDITION
  5. waitStatus ===>>> -3 Node.PROPAGATE
    public final boolean release(int arg) {

// 尝试释放锁,返回true则表示已经释放
if (tryRelease(arg)) {
// 头节点
Node h = head;
// 头节点不为null 并且waitStatus不为0
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
[/code]

tryRelease

子类必须重写

tryRelease
方法,不然会抛出异常。

    protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
[/code]

unparkSuccessor

唤醒队列中的头节点线程

    /**
* node为头节点
*/
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
// 状态不为CANCELLED
if (ws < 0)
// 将waitStatus设置为0
compareAndSetWaitStatus(node, ws, 0);

// node的后继节点
Node s = node.next;
// 后继节点为空 或者 状态是CANCELLED
if (s == null || s.waitStatus > 0) {

s = null;

// 找到一个有效节点
// 尾节点!=头节点,从尾节点找到头节点的下一个未被CANCELLED的节点
// 疑问?为什么从尾节点往前遍历,而不从前节点往后遍历??
// HeadPrev - HeadNext - APrev - ANext - (TailPrev - TailNext) => s=TailNode => t=t.prev=ANode
//                           ||                    t
//             HeadPrev - HeadNext - (APrev - ANext) => s=ANode => t=t.prev=HeadNode
//                           ||             t
// ------------------------退出循环---------------------
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// s就是找到的有效节点(头节点之后的第一个有效节点,因为头节点是空节点)
if (s != null)
LockSupport.unpark(s.thread);
}
[/code]

思考?

为什么从尾节点往前遍历,而不从头节点往后遍历?

因为head.next可能被设置为null

LockSupport

unpark

解除线程阻塞

    public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
[/code] posted @ 2019-06-12 18:05 jarjune 阅读(...) 评论(...) 编辑 收藏
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: