您的位置:首页 > 产品设计 > UI/UE

ReentrantLock 源码分析 - AbstractQueuedSynchronizer 详解(二)

2020-04-28 07:43 951 查看

前言

在上一篇 ReentrantLock 源码分析 - AbstractQueuedSynchronizer 详解(一) 是这样总结 AQSacquire(int arg) 方法的:

如果当前线程获取失败,就把它放入到 AQS 队列当中, 顺便挂起当前线程

这种说法显然比较粗, 实际上acquire(int arg) 内部还有很多细节,本文继续来进行探讨。

正文

整体思路如下:

  • 先来文字描述,对大概流程有个了解
  • 再配合图片,加强对文字描述的理解
  • 最后分析源码,验证整个逻辑

一、 几个重要属性

上一篇有提到, AQS 中有几个属性,

  • head 指向队列的头节点
  • tail 指向队列的伪节点

Node 的几个属性

  • prev 前驱指针
  • next 后继指针

熟悉数据结构的话, 对这些命名,名词都不会陌生

二、 acquire(int args) 方法的执行流程

2.1 文字描述

AQS 队列的 头节点 是不参与锁竞争的, 可以称之为 “哨兵节点”。AQS 初始化时, headtail 会指向 null

  • 当一个线程 thread1 竞争锁失败,会被作为一个 node 放入 AQS 队列

    如果 队列为空

    首先,会初始化一个 “哨兵节点”,并将 headtail 指向它
  • 然后, nodeprev 指针 指向 “哨兵节点”, “哨兵节点” 的 next 指针指向 node
  • 如果 队列不为空,直接把 node 放入队尾

  • node 入队成功后, 如果 nodeprev 指针 指向队列的 head 节点,那么 node 就可以尝试去获取锁了。

      如果 node 尝试获取锁成功, 那么 node 就会成为新的 “哨兵节点”,
    • 然后,原来的“哨兵节点”会指向 null, 等待 GC
  • 如果 node 的前驱节点不为 head ,或者 node 获取锁失败

      去根据 node前驱节点 的线程状态去剔除部分节点,比如 有些线程被取消了,就该被剔除。
    • 然后通过 LockSupport.park()方法挂起当前线程, 并返回线程是否被中断
    • 继续循环

    2.2 图解说明

    1. AQS 队列初始状态

    1. 当一个线程 thread1 获取锁失败, 如果此时队列为空,会初始化一个“哨兵节点”。

    1. 然后把 thread1 作为一个 node 放入 队列中

    1. 最后会去判断 node前驱节点 prev 是否为 head 节点, 如果是,就会尝试获取锁,并把当前节点作为头节点。

    2.3 源码分析

    2.3.1 acquire(int arg)
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
    2.3.2 tryAcquire(arg)

    该方法之前也有提到,是一个抽象方法,由具体子类去实现。

    不管是谁来实现, 大致意思就是尝试去获取锁,返回 true / false。

    我们来看ReentrantLock非公平锁 部分是如何实现的

    final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取 state 值
    int c = getState();
    // 如果state == 0, 表示锁未被占用
    if (c == 0) {
    // cas 操作,state + 1, 标记锁被占用
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    // 如果 state !=0, 就看当前线程是否就是 占用锁 的那个线程
    else if (current == getExclusiveOwnerThread()) {
    // 重入次数 + 1
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }
    2.3.3 addWaiter(Node node)

    该方法的主要逻辑就是把 node 放入队列的尾部,就是一个入队操作,然后做了些优化设计。

    // mode 为 Node.EXCLUSIVE, 前面有提到,该 node 标记为抢占独占资源
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 这里很巧妙,先提前一步判断队列是否为空,如果不为空,直接将 mode 放入队列中
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    // 正常的思路就只调用这个 入队 方法即可
    enq(node);
    return node;
    }
    2.3.4 enq(final Node node)

    这个方法的最终目的是:当多个线程同时需要进入队列时,保证所有 线程 最终能有序的入队,防止出现并排站多人等情况。

    // 入队
    private Node enq(final Node node) {
    // 死循环,保证所有节点有序入队
    for (;;) {
    Node t = tail;
    // 如果队列为空
    if (t == null) { // Must initialize
    // 队列为空,新增一个哨兵节点
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    // 这里面的逻辑就跟上个方法一样, 队列不为空,就入队
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }
    2.3.4 acquireQueued
    // node 为入队的节点
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    // 获取 node 的前驱节点
    final Node p = node.predecessor();
    // 判断 node 的前驱节点是否为 head, 前面提到 head 不参与锁竞争
    // 如果 node 的前驱节点是 head,那么 node 就去尝试获取锁
    if (p == head && tryAcquire(arg)) {
    // node 节点获取到锁后, 自动成为 head 节点,用来做哨兵
    setHead(node);
    // 而之前的哨兵节点, 会指向 null,然后被 GC
    p.next = null; // help GC
    // 标记此步骤,没有失败
    failed = false;
    // 表示当前线程未被中断
    return interrupted;
    }
    
    // 如果 node 的 前驱节点不是 head 节点,或者 获取锁失败了,
    // 就根据某种条件,就挂起当前线程
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    2.3.5 shouldParkAfterFailedAcquire
    //  获取线程状态
    int ws = pred.waitStatus;
    // 如果线程在等待被唤醒(Node.SIGNAL==-1),返回 true
    if (ws == Node.SIGNAL)
    return true;
    // 如果线程被取消,跳过该节点
    if (ws > 0) {
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    // 要是等于0 说明这是一个新加入的节点,给一次 tryAcquire 的机会,
    //要是 <-1 那说明这个节点是 await 进来 or 这是个共享传播锁,那必须在给 1 次  tryAcquire  机会
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    2.3.6 parkAndCheckInterrupt
    private final boolean parkAndCheckInterrupt() {
    // 前面有提到, 该方法会挂起线程
    LockSupport.park(this);
    // 判断线程是否被挂起
    return Thread.interrupted();
    }

    总结

    本文详细分析了 AQSaquire(int arg) 方法,该方法的主要目的是获取独占资源,再来总结下详细的步骤:

    1. 线程 A 通过 tryAcquire(int arg) 方法尝试去获取资源,返回 true/false,true 为成功,反之失败, 由具体子类去实现。
    2. 如果第 1 步失败了,就把线程 A 放入队列,具体看上文。
    3. 入队成功后,检查下 线程 A 所在节点 的上一个节点是否是头节点(因为头节点不参与抢占资源), 如果是,线程 A 就会再次去尝试获取资源。返回值为 interrupted,默认为 false。
    4. 第 3 步,如果失败了,会一直循环去执行,直到执行成功。
    5. 在第 4 步执行的过程中,会判断当前线程是否被中断了,如果是,则 interrupted = true
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: 
    相关文章推荐