您的位置:首页 > 编程语言 > Java开发

ReentrantLock 源码解析

2018-03-18 00:21 429 查看
关于如何实现锁,先思考几个问题?
1、【锁资源内存访问控制】
        多线程竞争的资源,如何保证在同一时刻,只被一个线程拥有,其他线程进入排队队列?
2、【等待队列模型】【公平竞争锁】【非公平竞争锁】
进入排队的线程,从【等待队列】出来,又是怎样开始获得锁的?此时获取锁,是【等待队列】的所有的线程竞争获取锁还是按照队列的顺序获取【锁】?
3、【线程之间的通信】
  可重入锁怎样实现这样一个场景?: 线程获取锁,然后等待某个条件一段时间之后,发现达成条件或超时后退出。
4、【条件等待队列】
   获取锁之后的线程,才能拥有条件【Condition】加入条件的等待队列,然后释放拥有的锁,等待其他线程获取锁,收到条件通知或超时退出等待队列。
5、【锁资源的等待队列模型】【锁的条件队列模型】
    如何让竞争变得有序?引入【排队机制】,那怎样设计每个正在排队的【人】节点 ?
6、【引入信号量模型】
     信号量,表示了当前拥有这个资源的【进程数】或【线程数】。

一、竞争资源控制

    jvm 的Unsafe 的 CAS 内存操作,竞争资源是 state 字段时,通过反射获取state 的Field 对象,
   compareAndSwapObject(stateField, int expected,  int updated ); java 虚拟机的Unsafe操作底层能确保在同一时刻只能有一个线程更新值。

二、排队的【节点 Node 】数据结构模型设计

1、状态定义,等待、取消、条件等待
2、哪个线程?thread 
3、排队的位置,前后是谁?形成了双向链式表
static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1; // 排队的状态
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile Node prev;  // 排在前面的人
        volatile Node next;  // 排在后面的人
        volatile Thread thread; // 排队的实体是 哪个线程?
        Node nextWaiter;  条件等待队列

    }    
三、类设计解读与分析
  ReetrantLock 具体实现是通过内部类Sync 实现的。
 Sync 获取锁时,有两种实现公平锁、非公平锁。





四、公平和非公平锁在 lock()方法,实现有什么不同?
     NofairLock->lock()时,先是尝试获取锁,而不是去看看当前【排队的情况】

     FairLock 排队获取锁。
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1)) 先去修改state,若成功,则获取锁成功,保存当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //获取失败,再去排队。。信号量设置1. 父类实现
}public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取锁成功,直接OK.
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //获取失败,先加入排队队列,Thread进入while()循环等待。
selfInterrupt();// 尝试获取锁失败,且不间断的获取锁被中断了,执行interrupt。
}
/** 公平锁实现尝试获取锁
* Fair version of tryAcquire.  Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //信号量
if (c == 0) { 当前没有线程获取资源
if (!hasQueuedPredecessors() && // 前面没有排队的线程 并且 修改成功
compareAndSetState(0, acquires)) { 修改信号量为 1
setExclusiveOwnerThread(current); 保存当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //是不是当前线程已经获取了该资源?
int nextc = c + acquires; 是,信号量增加,可重入锁设计
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); 2
return true;
}
return false;
}
}
/**
* Performs non-fair tryLock.  tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { //直接修改信号量成功,不用查看前面是否有人。
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/** 不间断的方式,获取到锁 或 被中断
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) { 当前Node,信号量1
boolean failed = true;
try {
boolean interrupted = false; 没有被中断
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //循环的出口条件是:当前节点的前一个是头结点(是个空节点)
setHead(node); 排在第一个且获取锁成功。
p.next = null; // help GC,前一个节点出队列。
failed = false;
return interrupted; //获取成功 or 被中断
}
if (shouldParkAfterFailedAcquire(p, node) && //否则获取失败应该Park.
parkAndCheckInterrupt()) //检查中断否
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
五、此时发现,一个线程获取资源后,除非释放锁,不然另一个线程一直在等待中。如何实现线程之间交流?
在这里引入Netty 的一个请求、响应场景。
1、在Netty 中请求和响应是异步的,怎么才能转同步实现?引入加锁+条件等待机制,确保两个线程使用的是同一个对象既是同一个锁。
 Request  get(int timeOut) 获取请求,需要加锁等待,得到响应的通知后或者超时,释放锁退出。
 void setResponse() 响应Handler 接收到netty的返回数据时,通知等待的线程,释放锁退出。
private void doReceived(Response res) {
lock.lock();加锁 state=1,此时这个线程是怎么获取到锁的?
try {
response = res;
if (done != null) {
done.signal(); 条件通知,已经Response了。
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) { // 没有得到Response
long start = System.currentTimeMillis();
lock.lock(); //加锁,state =1,此时加锁了,Response线程是无法得到锁的,怎么被通知的?
try {
while (!isDone()) { 循环
done.await(timeout, TimeUnit.MILLISECONDS); 条件等待指定时间。
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;// 得到通知 或者超时
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();释放锁
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
1、get()操作加锁了,进入了条件等待,有没有释放自己的锁?
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); 进入条件等待队列
int savedState = fullyRelease(node);完全释放自己的锁getState()-getState()=0
final long deadline = System.nanoTime() + nanosTimeout; 截止时间
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) { 进入循环等待:false
if (nanosTimeout <= 0L) { 超时
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();剩余时间
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT; 再次获取到锁
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
*         returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively()) 当前线程 != 拥有锁的线程
throw new IllegalMonitorStateException();
Node first = firstWaiter; 条件等待队列里的第一个等待者
if (first != null)
doSignal(first); 通知
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) { 被通知的 条件等待线程
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); 等待线程 再次加入【排队Sync队列】可以继续去获取锁了。
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); 将 状态由【Condition条件等待】改为[0] -> 【signal通知】
return true;
}
2、有2种队列,一是【Sync 同步队列,既是获取锁资源的排队队列】;二是【锁的条件队列】。
3、通知机制实现
  (1)get() 线程【得到了锁】,然后进入了【条件等待】,【完全释放】了当前的锁,【加入了条件等待队列,状态为Condition 】,直到【再次被加入Sync 队列,且状态被修改成Signal】,等到setResponse()线程释放锁 。
 (2)setResponse 线程只有在获取到锁之后,将【等待队列的第一个等待线程移到】->【Sync 队列中】,并将状态修改为【Signal】,完成了通知,释放锁。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息