您的位置:首页 > 编程语言 > Java开发

java独占锁ReenTrantLock的实现

2015-12-26 18:08 489 查看
在jdk1.5之后,新增了Lock接口以及ReenTrantLock的实现类来代替同步原语sychronized,相较于sychronized同步原语,Lock接口的实现提供了:

获取锁的可操作性

尝试非阻塞的获取锁

可中断的获取锁

超时获取锁

以上的这些功能,是sychronized同步原语所不具有的功能,在需要使用高级特性的锁时,就需要使用jdk提供的lock接口来实现。

一:同步器AQS

java AQS类结构的实现是使用模板方法设计模式来实现的,在顶级抽象类AQS中,定义了若干同步状态获取和释放的方法,如下:

模板方法

独占锁的获取和释放同步状态方法


public final void acquire(int arg);
public final void acquireInterruptibly(int arg)  throws InterruptedException ;
public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException;
public final boolean release(int arg);


共享锁的获取和释放同步方法


public final void acquireShared(int arg);
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException;
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException;
public final boolean releaseShared(int arg);


获取同步队列的方法


public final Collection<Thread> getQueuedThreads();
public final int getQueueLength();
public final boolean hasQueuedThreads();
.........


子类重写方法

独占锁

protected boolean tryAcquire(int arg);
protected boolean tryRelease(int arg) ;


共享锁


protected int tryAcquireShared(int arg);
protected boolean tryReleaseShared(int arg);


同步器是否被当前线程所占有


protected boolean isHeldExclusively();


那么使用该同步器AQS,就可以同时实现独占锁和共享锁(通过继承AQS并重写对应的获取和释放同步状态的方法)。

在AQS中,使用一个FIFO双向队列来存放当前线程,队列的类结构为:

static final class Node {
static final Node SHARED = new Node();
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() {    // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) {     // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}


在该节点类中,

prev和next为当前节点的前驱和后继结点的引用。

thread为当前线程的引用。

waitStatus为当前节点中线程的等待状态,

- 0 当一个节点初始化时为
- -1 表示后继结点线程处于等待(被阻塞),若当前节点的线程释放了同步状态或者被取消,将会通知后继结点的线程,使后继结点得以运行(在获取锁和释放锁的时候非常重要)。
- 1 表示处于同步队列中的等待线程被取消或者被中断,需要从同步队列中取消等待状态。
- 还有几个状态后续分析


二:ReenTrantLock同步工具类的实现

该同步工具类通过聚合一个队列同步器AQS的子类,来向上层lock的实现提供:

同步状态管理 通过一个整型的变量private volatile int state;

线程的同步 通过一个FIFO的双向队列(使用CAS保证在队列尾部线程安全的添加节点)

等待与唤醒线程 通过使用 LockSupport组件提供的阻塞线程功能来阻塞与唤醒节点中的线程

ReenTrantLock类聚合一个继承自AQS同步器的抽象子类

abstract static class Sync extends AbstractQueuedSynchronizer,并使用两个子类来实现该Sync类:

static final class NonfairSync extends Sync 非公平锁,ReentrantLock类使用的默认AQS实现

static final class FairSync extends Sync 公平锁

三:ReentrantLock的lock与unlock分析

假设有三个线程A,B,C,D同时请求获得锁资源:

当ReentrantLock类的lock方法被调用时

public void lock() {
sync.lock();
}


会调用AQS的模板方法lock(),继而会调用ReentrantLock默认实现的非公平锁实现方式,调用

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}


1,在该方法中,若:

A线程首先请求获得该同步状态,那么就会以CAS的方式,原子更新同步状态为1,并获得该同步状态。

B线程,C线程,D线程在请求获得锁的时候,就会调用AQS的模板方法acquire(1)方法。

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


2,此时B,C,D线程进入acquire方法之后,首先调用tryAcquire(1)方法,该方法为ReenTrantLock类中同步器AQS的子类重写方法(非公平锁):

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}


3,接着就会调用AQS的私有方法addWaiter(Node.EXCLUSIVE)创建独占式的同步队列节点:

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {//1
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {//2
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}


若假设请求顺序为B线程先,C,D线程后

- 首先根据当前线程创建同步节点,且waitStatus同步等待状态初始化为0。

- 当创建B线程对应的节点之后,尝试快速添加失败,会调用enq(node)方法。 在该方法中,设置了一个自旋式的CAS更新方式将AQS的同步队列的尾节点引用tail指向新添加的节点。在第一次自旋中,会初始化一个空节点为首节点,将head和tail都指向该首节点。

在首节点初始化之后,1处的CAS原子更新尾节点和2处的自旋式原子更新尾节点会保证将节点线程安全的添加到同步队列。此时同步队列如下:



线程B,C,D均被放入同步节点被添加到AQS的同步队列中,并且AQS的head引用指向一个空节点,节点的同步等待状态waitStatus=0。

4,对于每个线程以及其对应的节点,接下来会被调用acquireQueued(addWaiter(Node.EXCLUSIVE), 1)方法,该方法会将当前节点自旋两次(若非首节点),第一次将前驱节点的waitStatus更改为-1,第二次使当前线程等待:

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}


对于D节点,首先获取前驱节点C,在第一次自旋的时候,由于非首节点,并且获取同步状态失败,会运行shouldParkAfterFailedAcquire(p, node)方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}


在该方法中,由于前驱节点的waitStatus=0,故会将前驱节点的waitStatus以CAS的方式更新为-1,表示若前驱节点释放同步状态或者被取消,通知并唤醒后续节点,返回false,第一次循环结束。

第二次循环:同样非首节点,并且获取同步状态失败,接着运行shouldParkAfterFailedAcquire(p, node)方法,第二次运行该方法时,获取前驱节点waitStatus=-1,直接返回true。继而运行 parkAndCheckInterrupt()方法:

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}


该方法中,首先阻塞当前线程,然后判断中断标志位。至此,D线程被阻塞,等待前驱节点中的线程通知,将其唤醒。

同理B,C线程会经过同样的过程。且B,C线程都被阻塞

AQS同步器的同步队列状态为:

`


若此时A线程释放持有的同步状态资源,在A线程中调用

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}


释放同步资源,那么同步状态state在tryRelease(arg)方法中被更新为0,该方法为AQS同步器子类重写的方法

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}


接着获取首节点head,并且满足if判断(首节点为非空节点,并且首节点的同步状态被后继结点在第一次自旋时更新为-1),进入 unparkSuccessor(h)方法

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}


对于首节点 head,其ws=-1,接着获取后继结点B,将节点B中的线程B唤醒。至此,unlock()方法的两个步骤结束(释放同步状态,唤醒首节点的后继结点)。

节点B中的线程B被唤醒之后接着运行 final boolean acquireQueued(final Node node, int arg) 方法,并满足if判断条件,继而执行

setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;


将节点B置为首节点,并从acquireQueued()自旋方法中return。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: