您的位置:首页 > 其它

ReentrantLock的可重入特性

2019-10-19 18:08 1826 查看

自旋分布式锁实现 中我们已经分析了ReentrantLock的自旋特性,现在我们来分析一下它的可重入特性。

可重入特性其实说白了就是当获得锁的线程解锁后,重新来获取锁的时候会判断自己以前是否获取过锁,如果获取过就无需竞争,直接获取。

我们再来继续跟进非公平锁的加锁代码

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
        //可重入的主入口,判断是否需要参与无锁竞争
acquire(1);
}

在AbstractQueuedSynchronizer中

public final void acquire(int arg) {
    //如果获取锁失败且前置节点线程被唤醒
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //当前线程请求中断
selfInterrupt();
}
//tryAcquire在非公平锁中被重写了,所以这里可以不用考虑
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

我们来看一下重写后的tryAcquire()尝试获取方法

返回NonfairSync

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

在sync中

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
    //获取锁的状态值
int c = getState();
if (c == 0) {
        //如果是0,就进行无锁竞争,竞争成功的将当前线程设为AQS的独占主线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
    //如果不是0(但也可能不是1),判断AQS的独占主线程是否就是当前线程
else if (current == getExclusiveOwnerThread()) {
        //锁的状态值加1,表示当前线程第几次进入该锁,无需参与无锁竞争
        //为什么说是非公平锁,就是说拿到锁的线程可以不断的优先获取到锁
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

我们再来看一下acquireQueued()方法,返回AbstractQueuedSynchronizer中

final boolean acquireQueued(final Node node, int arg) {
    //建立是否获取锁失败标识
boolean failed = true;
try {
        //建立是否已经中断标识
boolean interrupted = false;
        //无限循环
for (;;) {
            //获取尾部节点的前置节点
final Node p = node.predecessor();
            //如果该前置节点为头节点,且当前线程获取锁成功
if (p == head && tryAcquire(arg)) {
                //将尾部节点设为头节点,并清空头节点的线程和前置节点
setHead(node);
                //清空头节点的下一个节点,表示节点队列只有一个节点
p.next = null; // help GC
                //表示当前获取锁成功,且未中断
failed = false;
return interrupted;
}
            //因为是无限循环,如果尾部节点的前置节点要唤醒了,且当前线程被中断了
            //中断标识被设为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
            //如果获取锁失败,取消当前线程节点获取锁,唤醒前置节点线程
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取尾部节点的前置节点的等待状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 如果该状态为需要被唤醒状态,返回true
*/
return true;
if (ws > 0) {
/*
* 如果该状态为被取消状态,将尾部节点的前置节点前移,直到不是被取消状态的节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 如果该状态不是被取消状态,通过无锁竞争,将尾部节点的前置节点的状态更新为被唤醒状态
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
    //阻塞当前线程,返回线程中断
LockSupport.park(this);
return Thread.interrupted();
}
private void cancelAcquire(Node node) {
//尾部节点不能为null
if (node == null)
return;
    //清空尾部节点的线程
node.thread = null;

//获取尾部节点的前置节点
Node pred = node.prev;
while (pred.waitStatus > 0)
        //如果该前置节点状态为被取消状态,将尾部节点的前置节点前移
node.prev = pred = pred.prev;

//获取前移后的前置节点的下一个节点
Node predNext = pred.next;

//更新尾部节点的等待状态为被取消状态
node.waitStatus = Node.CANCELLED;

//通过无锁竞争,将尾部节点设为之前尾部节点的前置节点,即移除现有的尾部节点
if (node == tail && compareAndSetTail(node, pred)) {
        //通过无锁竞争,将更新后的尾部节点的下一个节点设为null
compareAndSetNext(pred, predNext, null);
} else {
//如果node不是尾部节点了,即node在节点列表中被移除了
int ws;
        //更新后的尾部节点不为头节点且该尾部节点的等待状态为待唤醒状态(不为待唤醒状态也会被无锁竞争更新为待唤醒状态)
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
            //获取node的下一个节点
Node next = node.next;
            //如果该节点不为null且该节点等待状态不为取消状态
if (next != null && next.waitStatus <= 0)
                //通过无锁竞争,将该节点设为现在尾部节点的下一个节点
compareAndSetNext(pred, predNext, next);
} else {
            //如果更新后的尾部节点的等待状态为取消状态,唤醒前置节点中等待状态不为被取消状态的节点
unparkSuccessor(node);
}

node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
//获取node的等待状态
int ws = node.waitStatus;
if (ws < 0)
        //如果该状态不为取消状态,更新为无状态
compareAndSetWaitStatus(node, ws, 0);

//获取node的下一个节点
Node s = node.next;
    //如果该节点为null或者该节点状态为被取消状态
if (s == null || s.waitStatus > 0) {
        //将该节点设为null
s = null;
        //从尾部节点开始向前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
            //如果遍历的节点不为被取消状态,获取该节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
        //如果该节点不为null,唤醒该节点的线程
LockSupport.unpark(s.thread);
}
static void selfInterrupt() {
    //当前线程尝试中断
Thread.currentThread().interrupt();
}

这里有个Node的参数,AQS的内部通过Node内部类一个个连接起来实现FIFO同步队列,它的各属性如下

static final class Node {
/** 共享模式 */
static final Node SHARED = new Node();
/** 独占模式 */
static final Node EXCLUSIVE = null;

/** 表示线程已被取消(等待超时或者被中断) */
static final int CANCELLED =  1;
/** 表示后继节点中的线程需要被唤醒(unpaking) */
static final int SIGNAL    = -1;
/** 表示结点线程等待在condition上(等待队列),当被signal后,会从等待队列转移到同步到队列中 */
static final int CONDITION = -2;
/**
* 表示下一次共享模式下同步状态会被无条件地传播下去
*/
static final int PROPAGATE = -3;

/**
* 当前节点的状态
*/
volatile int waitStatus;

/**
* 前置节点
*/
volatile Node prev;

/**
* 后置节点
*/
volatile Node next;

/**
* 当前节点的线程
*/
volatile Thread thread;

/**
* 下一个等待节点
*/
Node nextWaiter;

要用到的一个构造器

Node(Thread thread, Node mode) {     // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//获取前置节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

在AQS中包含了两个节点属性

/**
* 头部节点
*/
private transient volatile Node head;

/**
* 尾部节点
*/
private transient volatile Node tail;

而该参数被赋值为

private Node addWaiter(Node mode) {
    //初始化一个Node节点,其线程为当前线程,独占模式,这里就是一个要添加到节点队列的新节点
Node node = new Node(Thread.currentThread(), mode);
//获取AQS的尾部节点
Node pred = tail;
if (pred != null) {
        //如果该节点不为null,将尾部节点设为新节点的前置节点
node.prev = pred;
        //无锁竞争尾部节点,竞争到的节点变成节点队列的新尾部节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
    //如果节点队列没有节点,将自动创建头尾部节点,再竞争尾部节点
enq(node);
return node;
}
private Node enq(final Node node) {
    //无限循环
for (;;) {
        //获取尾部节点
Node t = tail;
if (t == null) { // Must initialize
            //如果尾部节点为null,表示节点队列还没有节点,初始化一个无参构造器节点
            //来参与无锁竞争头部节点
if (compareAndSetHead(new Node()))
                //竞争的新节点同时也放到尾节点,表示节点队列只有一个节点
tail = head;
} else {
            //如果尾部节点不为null,表示节点队列中有节点
            //将尾部节点设为带有当前线程,独占模式的节点的前置节点
            //由于这里是无限循环,所以这里节点队列中一定会有节点,没有节点将会在前面被创建
node.prev = t;
            //再无锁竞争节点队列的尾部节点,竞争到的节点将会变成节点队列的尾部节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: