您的位置:首页 > 编程语言 > Java开发

java并发包-ReentrantLock(一):lock()是如何工作的

2017-03-15 13:03 218 查看
(本文源代码来自openjdk1.8,有错误和不足欢迎讨论指正)

ReentrantLock是java并发包提供的最常用的显示锁,除了实现内置锁的基本功能,还允许实现一些高级功能。
ReentrantLock的声明如下:


public class ReentrantLock implements Lock, java.io.Serializable {
//内容忽略
}


可以看到它实现了Lock接口与Serializable接口。Lock接口是各个显示锁的基础接口,包含以下方法:


public interface Lock {
//请求获取一个锁,获取成功立即返回,否则阻塞至成功获取
void lock();
//同上,但允许阻塞时被中断,从而放弃获取
void lockInterruptibly() throws InterruptedException;
//请求获取锁,立即返回,表示成功或失败
boolean tryLock();
//在一定时间内请求获取锁,成功则返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//创建一个新Condition
Condition newCondition();
}


ReentrantLock的实现依赖于其内部类,Sync,NonFairSync和FairSync。Sync是AbstractQueuedSynchronizer(这个类实现了一个线程的FIFO队列,之后会详解)的子类,而另外两个类均继承自Sync,表示公平的和非公平的锁。接下来以实际代码为例, 当我们调用lock.lock()方法时,它实际将请求委托给了其内部的Sync对象:(调用默认构造方法,sync指向一个非公平sync对象,调用ReentrantLock(boolean fair)方法可以使sync指向公平sync对象,此处以非公平为例):


public void lock() {
sync.lock();
}


此时进入NonfairSync查看其lock()方法:


final void lock() {
if (compareAndSetState(0, 1))//使用cas指令尝试获取锁
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);//进入正常流程获取锁
}


非公平的锁第一步尝试使用cas操作修改state的值并来占有锁(CAS是Compare And Swap的简称,比较并交换,是由机器提供的一条***原子***指令,接受3个参数,内存地址,旧值,新值。此处的方法封装了Unsafe的cas相应方法,如果state的为旧值,修改其为新值,返回true,否则返回false。state是类AQS中的状态变量,在ReentrantLock的实现中,内部对象sync的state值等于零时,代表没有线程占有锁,否则代表该锁已经被某线程持有,值代表重入次数),如果cas操作成功了,代表锁获取成功,设置相应状态,lock()方法立即返回,线程可执行lock()后面的代码。
如果抢占(插队)失败了,那么就需要进入正常流程,排队获取锁。(在公平锁的实现中,没有上一步cas抢占操作,因此后来者不能插队,只能老实的排队)。acquire(1)的流程如下:


public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


tryAcquire()方法在AQS中基本是个空实现(直接抛异常),由子类实现,以NonFairSync为例,它将的作用是直接调用Sync类的tryNonFairAcquire()(FairSync将调用自己实现的的tryAcquire(),非公平的try如下:


final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//当前没有线程持有锁,插队抢占
if (compareAndSetState(0, acquires)) {//抢占成功
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//我已拥
//有此锁,重入
int nextc = c + acquires;
if (nextc < 0) // 重入次数太多,上溢出啦..........
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;//本线程既没有获取也没有重入,返回false代表失败
}


非公平的实现还是老样子先直接抢占一次,公平的实现如下:


protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//少了CAS抢占
if (c == 0) {//当前锁无人持有
if (!h
4000
asQueuedPredecessors() &&//我前面已经没人排队
compareAndSetState(0, acquires)) {//锁获取成功
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;//既没有获取锁也没有重入
}


回到acquire()方法,如果tryAcquire()返回true,代表该线程获取了锁,acquire方法返回,线程最开始的lock方法也可以返回了。如果tryAcquire返回false,那么意味着锁获取失败,当前线程要去排队了,因此调用addWaiter和acquireQueued方法,看其实现:


private Node addWaiter(Node mode) {
//将当前线程包装成一个节点,mode是等待的模式,有共享和排它(独占)两种
//reentrantlock是排它锁,所以之前传入了Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
// 找到尾节点
Node pred = tail;
if (pred != null) {
//自己的前节点指向尾节点
node.prev = pred;
//尝试把尾节点指向自己,也就是把自己插入到队伍最后
//因为可能有多个线程都在试图将尾节点指向自己,使用cas操作处理竞争
if (compareAndSetTail(pred, node)) {
//竞争成功的线程,将原尾节点的后继节点指向自己,入队正式成功
pred.next = node;
return node;
}
}
//竞争失败的线程,或者队列为空时在enq()方法中完成入队
enq(node);
return node;
}

//在上一个方法失败的线程进入此方法入队,如果队列为开始状态,也进入此方法
//因为队列采用了延迟初始化,使用时才初始化
private Node enq(final Node node) {
for (;;) {//无限循环,直至cas成功入队,如果够倒霉,可能饥饿
Node t = tail;
if (t == null) { // Must initialize初始化队列
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}


addWaiter方法之后,线程代表的节点就进入了队伍中了,这个时候就会调用acquireQueued方法对线程进行阻塞,唤醒,争用锁了,(这个方法是AQS中的核心):


final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//记录线程是否被中断
for (;;) {
final Node p = node.predecessor();

/*如果我的前节点是队伍头,那么我要竞争锁
因为AQS维护了一个FIFO队列,头结点代表持有锁的线程,当头节点
释放锁的时候,会唤醒其后继节点竞争锁(在nofair下有非公平的线程也参与竞争)
如果当前节点的前节点不是头结点,可能有以下原因:
1.本线程第一次进入此循环
2.本线程被interrupt,从parkAndCheckInterrupt()醒来,因为是死循环,所以执行到这,等会还会进入parkAndCheckInterrupt()阻塞
3.当前线程的park()无理由返回(后序会有说明)
*/

if (p == head && tryAcquire(arg)) {//竞争成功
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*如果在上一步竞争失败了,那么先通过shouldParkAfterFailedAcquire判断是否挂起,如果是,就通过parkAndCheckInterrupt挂起线程,**线程在此阻塞***/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//记录我被中断过
}
} finally {//如果出现了奇怪的状况,导致线程竞争失败就
//取消线程请求。奇怪是因为:死循环的唯一退出路径就是成功
//获取到锁,这时竞争是成功的,if判断里的语句似乎永远不会执行
//这段逻辑也希望有人能解读一下?
if (failed)
cancelAcquire(node);
}
}


判断是否挂起的代码很简单,如下:


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//waitStatus是一个记录节点状态的变量
//SIGNAL
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//如果前节点等待被唤醒,意味着本节点也应该挂起等待

return true;
if (ws > 0) {//为1,代表CANCEL,即前节点被取消,将前节点移除队列
//从前节点位置往前开始寻找找到一个没有取消的节点作prev
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前节点的状态是0或者传播,这在共享锁中有此状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//不能挂起,重新尝试获取锁
return false;
}


如果当前节点的上一个线程等待被唤醒,那么当前节点也应当挂起。如果上一个节点被取消了,那么我们应当重新寻找一个前节点,并且不能挂起,因为我们寻找到的新前节点可能是队列头,这意味着需要进行一次判断并竞争锁。否则,前节点处于0或传播状态,在共享锁实现时会由此情况,这里暂且不提。
判断是否应当挂起后,就可能会将线程挂起(前者返回true)


private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//线程挂起,阻塞在这里
return Thread.interrupted();//执行到这里意味着park()返回,阻塞被解除了,返回true代表线程被中断而解除了阻塞
}


LuckSupport提供了线程阻塞/解除阻塞的基本原语。park()方法将线程挂起,直至许可可用;unpark()方法为线程发放一个许可。我查阅了JavaSE8 API,对LockSupport的park方法说明如下:
public static void park()
Disables the current thread for thread scheduling purposes unless the permit is available.


If the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:

Some other thread invokes unpark with the current thread as the target; or

Some other thread interrupts the current thread; or

The call spuriously (that is, for no reason) returns.

This method does not report which of these caused the method to return. Callers should re-check the conditions which caused the thread to park in the first place. Callers may also determine, for example, the interrupt status of the thread upon return.

总结一下,park方法返回有几个原因:
1.其他线程对此线程unpark()
2.其它线程中断了此线程
3.无理由的返回,这个最让我感到迷惑。阅读了一篇其它博主的博文,其原因是:当多个线程在一个对象的不同条件上等待时,某线程调用了此对象的唤醒,所有线程都将被唤醒,但实际只有部分线程是真的在这个条件上等待,其它线程是被无理由唤醒了,但是park方法不会管自己是如何返回的。(有点类似于wait和notifyAll)
举例说明park()方法的使用,
1.假设线程A调用了park()方法,那么A会阻塞,线程B在随后调用了unpark(A)或者A.interrupt()后,A线程解除阻塞,park返回;
2.线程B先对A unpark(A),线程A park()时无需阻塞,直接消耗此许可并返回.
3.许可不记数量,只有0或1,即一次park或消耗所有已有许可或者阻塞至第一个许可到来.

回到parkAndCheckInterrupt方法,线程阻塞在park()上,当park返回时,可能是:
线程被发放了许可,因为只有释放锁的线程可以发放许可,这意味着当前线程可在acquireQueued死循环的第一个if条件中竞争锁。
线程被外部中断了,但是lock方法不响应中断,因此醒来后还会继续回到parkAndCheckInterrupt方法阻塞,而Thread.interrupted()记住当前线程是否被中断了,并清除中断标记,之后将中断状态记录在interrupted变量里。
无理由返回,线程被假唤醒,真正的条件还没到,重新循环到parkAndCheckInterrupt挂起.
由上面的源代码可以知道acquireQueued中线程的状态转换:
检查条件是否竞争锁->挂起->被唤醒->再次检查条件是否竞争锁,直至有一次竞争成功,这也就是lock()方法实现阻塞与唤醒的原理
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息