您的位置:首页 > 编程语言 > Java开发

JDK源码——java.util.concurrent(三)

2017-05-02 08:01 411 查看
测试代码:

https://github.com/kevindai007/springboot_houseSearch/tree/master/src/test/java/com/kevindai/juc

Condition

首先来看下Condition的简单用法

public class ConditionTest {
public static void main(String[] args) {
final ReentrantLock reentrantLock = new ReentrantLock();
final Condition condition = reentrantLock.newCondition();

new Thread(new Runnable() {
@Override
public void run() {

reentrantLock.lock();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程1开始运行,等待条件执行");
try {
condition.await();//与wait()一样,会释放锁
System.out.println("线程1恢复执行");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("线程1释放锁");
reentrantLock.unlock();
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try{
reentrantLock.lock();
condition.signal();
System.out.println("线程2开始运行,释放条件");
}finally {
System.out.println("线程2释放锁");
reentrantLock.unlock();
}
}
}).start();

}
}


由这个例子能看出,Condition的await()方法与Object.wait()方法类似,signal()与Object.notify()类似,其实Condition还有signalAll()方法与Object.notifyAll()类似(用法类似,使用时需要获取锁,调用await()时会释放锁),下面咱们一起看看Condition的源码

Condition是一个接口,主要方法如下

void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

void signal();

void signalAll();


这些方法从字面上很容易理解,在此不做分析,下面咱们看看其实现类,先看await()方法

public final void await() throws InterruptedException {
if (Thread.interrupted())//判断当前线程是否被中断
throw new InterruptedException();
//将当前线程作为内容构造的节点node放入到条件队列中并返回此节点
Node node = addConditionWaiter();
//释放当前线程所拥有的锁,返回值为AQS的状态位(即此时有几个线程拥有锁(考虑ReentrantLock的重入))
int savedState = fullyRelease(node);
int interruptMode = 0;
/*
检测此节点是否在同步队列上,如果不在,说明此线程还没有资格竞争锁,此线程就继续挂起;直到检测到此节点在同步队列上(在什么时候加入的呢?在有线程发出signal信号的时候),
*/
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//此线程尝试的获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下条件队列中的不是在等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//报告异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}


private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
/*
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;如果此节点的状态不是CONDITION,则需要将此节点在条件队列中移除
*/
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;//获取最后一个在等待的节点
}
//将此线程作为内容构造一个节点加入到条件队列末尾。
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}


final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {//释放锁
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}


await方法的大概思想为:首先将此代表该当前线程的节点加入到条件队列中去,然后释放该线程所有的锁并开始睡眠,最后不停的检测AQS队列中是否出现了此线程节点.如果收到signal信号之后就会在AQS队列中检测到,检测到之后,说明此线程又参与了竞争锁.

注意:这里提到了两个队列,一个是Condition中的队列,一个是AQS的队列;

AQS队列是当前等待资源(这里的资源就是锁)的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空.

Condition队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:

用上面的Demo的两个线程来描述

1、首先,线程1调用lock.lock()时,由于此时锁并没有被其它线程占用,因此线程1直接获得锁并不会进入AQS同步队列中进行等待。

2、在线程1执行期间,线程2调用lock.lock()时由于锁已经被线程1占用,因此,线程2进入AQS同步队列中进行等待。

3、在线程1中执行condition.await()方法后,线程1释放锁并进入条件队列Condition中等待signal信号的到来。

4、线程2,因为线程1释放锁的关系,会唤醒AQS队列中的头结点,所以线程2会获取到锁。

5、线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒。只是加入到了AQS等待队列中去了

6、待线程2执行完成之后并调用lock.unlock()释放锁之后,会唤醒此时在AQS队列中的头结点.所以线程1开始争夺锁(由于此时只有线程1在AQS队列中,因此没人与其争夺),如果获得锁继续执行。

7、直到线程1释放锁整个过程执行完毕。

可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。

下面分析一下signal()方法

public final void signal() {
//检测当前线程是否为拥有锁的独占线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
/*
firstWaiter为condition自己维护的一个链表的头结点,
取出第一个节点后开始唤醒操作
*/

Node first = firstWaiter;
if (first != null)
doSignal(first);
}


private void doSignal(Node first) {
do {
//修改头结点,完成旧头结点的移出工作
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}


doSignal()方法干了两件事:1、修改条件队列中的头结点,1、完成旧的头结点的移出工作,即从Condition队列中移出到AQS同步队列中去。

final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//如果不能改变等待状态,那么当前节点被取消,return false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
Node p = enq(node);
int ws = p.waitStatus;
//将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}


可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程.

只有到发送signal信号的线程调用reentrantLock.unlock()后,它已经被加到AQS的等待队列中,才可能会被唤醒.

ReentrantLock

ReentrantLock可重入锁,与synchronized类似,但更方便灵活,可作为替代使用:

1.支持公平/非公平锁

2.支持响应超时,响应中断

3.支持condition

ReentrantLock实现了Lock接口,内部继承AQS实现这些功能,使用AQS的state来表示锁的重入次数,lock接口如下



看下如何实现AQS的

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

abstract void lock();

//非公平锁tryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//判断state是否被占用
if (c == 0) {//未被占用则cas占用,并设置当前线程为占用线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果被占用,则判断是否是当前线程占用的(锁可重入),如果是就把state+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//释放资源
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())//如果当前线程不是占用线程,则报错
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//状态-1后如果资源未被占用,则没有占用线程
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//当前线程是否是占用线程
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
//获取Codition,conditionObject维护一个条件队列,见上篇
final ConditionObject newCondition() {
return new ConditionObject();
}
//获取占用线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

final boolean isLocked() {
return getState() != 0;
}

private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}


ReentrantLock的实现又分为两种,公平锁、非公平锁,咱们直接看非公平锁(公平锁前面分析AQS的时候分析过了,http://blog.csdn.net/kevindai007/article/details/70314814):

static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
//直接尝试获取资源(不管有没有线程在条件队列中等待)
if (compareAndSetState(0, 1))

//获取成功则把当前线程设置为占有线程
setExclusiveOwnerThread(Thread.currentThread());
else
//如果未成功则加入等待队列(加入等待队列的线程是依次被唤醒的)
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}


ReentrantLock释放锁的过程在AQS的分析中都已经分析过了,在此不做重复分析.

ReetrantLock的公平和非公平的区分就是在Acquire的时候,非公平会先直接尝试cas修改,不成功再去排队;而公平锁就是老老实实请求排队操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: