JDK源码——java.util.concurrent(三)
2017-05-02 08:01
411 查看
测试代码:
https://github.com/kevindai007/springboot_houseSearch/tree/master/src/test/java/com/kevindai/juc
由这个例子能看出,Condition的await()方法与Object.wait()方法类似,signal()与Object.notify()类似,其实Condition还有signalAll()方法与Object.notifyAll()类似(用法类似,使用时需要获取锁,调用await()时会释放锁),下面咱们一起看看Condition的源码
Condition是一个接口,主要方法如下
这些方法从字面上很容易理解,在此不做分析,下面咱们看看其实现类,先看await()方法
await方法的大概思想为:首先将此代表该当前线程的节点加入到条件队列中去,然后释放该线程所有的锁并开始睡眠,最后不停的检测AQS队列中是否出现了此线程节点.如果收到signal信号之后就会在AQS队列中检测到,检测到之后,说明此线程又参与了竞争锁.
注意:这里提到了两个队列,一个是Condition中的队列,一个是AQS的队列;
AQS队列是当前等待资源(这里的资源就是锁)的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空.
Condition队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:
用上面的Demo的两个线程来描述
1、首先,线程1调用lock.lock()时,由于此时锁并没有被其它线程占用,因此线程1直接获得锁并不会进入AQS同步队列中进行等待。
2、在线程1执行期间,线程2调用lock.lock()时由于锁已经被线程1占用,因此,线程2进入AQS同步队列中进行等待。
3、在线程1中执行condition.await()方法后,线程1释放锁并进入条件队列Condition中等待signal信号的到来。
4、线程2,因为线程1释放锁的关系,会唤醒AQS队列中的头结点,所以线程2会获取到锁。
5、线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒。只是加入到了AQS等待队列中去了
6、待线程2执行完成之后并调用lock.unlock()释放锁之后,会唤醒此时在AQS队列中的头结点.所以线程1开始争夺锁(由于此时只有线程1在AQS队列中,因此没人与其争夺),如果获得锁继续执行。
7、直到线程1释放锁整个过程执行完毕。
可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。
下面分析一下signal()方法
doSignal()方法干了两件事:1、修改条件队列中的头结点,1、完成旧的头结点的移出工作,即从Condition队列中移出到AQS同步队列中去。
可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程.
只有到发送signal信号的线程调用reentrantLock.unlock()后,它已经被加到AQS的等待队列中,才可能会被唤醒.
1.支持公平/非公平锁
2.支持响应超时,响应中断
3.支持condition
ReentrantLock实现了Lock接口,内部继承AQS实现这些功能,使用AQS的state来表示锁的重入次数,lock接口如下
看下如何实现AQS的
ReentrantLock的实现又分为两种,公平锁、非公平锁,咱们直接看非公平锁(公平锁前面分析AQS的时候分析过了,http://blog.csdn.net/kevindai007/article/details/70314814):
ReentrantLock释放锁的过程在AQS的分析中都已经分析过了,在此不做重复分析.
ReetrantLock的公平和非公平的区分就是在Acquire的时候,非公平会先直接尝试cas修改,不成功再去排队;而公平锁就是老老实实请求排队操作。
https://github.com/kevindai007/springboot_houseSearch/tree/master/src/test/java/com/kevindai/juc
Condition
首先来看下Condition的简单用法public class ConditionTest { public static void main(String[] args) { final ReentrantLock reentrantLock = new ReentrantLock(); final Condition condition = reentrantLock.newCondition(); new Thread(new Runnable() { @Override public void run() { reentrantLock.lock(); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程1开始运行,等待条件执行"); try { condition.await();//与wait()一样,会释放锁 System.out.println("线程1恢复执行"); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("线程1释放锁"); reentrantLock.unlock(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try{ reentrantLock.lock(); condition.signal(); System.out.println("线程2开始运行,释放条件"); }finally { System.out.println("线程2释放锁"); reentrantLock.unlock(); } } }).start(); } }
由这个例子能看出,Condition的await()方法与Object.wait()方法类似,signal()与Object.notify()类似,其实Condition还有signalAll()方法与Object.notifyAll()类似(用法类似,使用时需要获取锁,调用await()时会释放锁),下面咱们一起看看Condition的源码
Condition是一个接口,主要方法如下
void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll();
这些方法从字面上很容易理解,在此不做分析,下面咱们看看其实现类,先看await()方法
public final void await() throws InterruptedException { if (Thread.interrupted())//判断当前线程是否被中断 throw new InterruptedException(); //将当前线程作为内容构造的节点node放入到条件队列中并返回此节点 Node node = addConditionWaiter(); //释放当前线程所拥有的锁,返回值为AQS的状态位(即此时有几个线程拥有锁(考虑ReentrantLock的重入)) int savedState = fullyRelease(node); int interruptMode = 0; /* 检测此节点是否在同步队列上,如果不在,说明此线程还没有资格竞争锁,此线程就继续挂起;直到检测到此节点在同步队列上(在什么时候加入的呢?在有线程发出signal信号的时候), */ while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //此线程尝试的获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //清理下条件队列中的不是在等待条件的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //报告异常 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. /* CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;如果此节点的状态不是CONDITION,则需要将此节点在条件队列中移除 */ if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter;//获取最后一个在等待的节点 } //将此线程作为内容构造一个节点加入到条件队列末尾。 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) {//释放锁 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
await方法的大概思想为:首先将此代表该当前线程的节点加入到条件队列中去,然后释放该线程所有的锁并开始睡眠,最后不停的检测AQS队列中是否出现了此线程节点.如果收到signal信号之后就会在AQS队列中检测到,检测到之后,说明此线程又参与了竞争锁.
注意:这里提到了两个队列,一个是Condition中的队列,一个是AQS的队列;
AQS队列是当前等待资源(这里的资源就是锁)的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空.
Condition队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:
用上面的Demo的两个线程来描述
1、首先,线程1调用lock.lock()时,由于此时锁并没有被其它线程占用,因此线程1直接获得锁并不会进入AQS同步队列中进行等待。
2、在线程1执行期间,线程2调用lock.lock()时由于锁已经被线程1占用,因此,线程2进入AQS同步队列中进行等待。
3、在线程1中执行condition.await()方法后,线程1释放锁并进入条件队列Condition中等待signal信号的到来。
4、线程2,因为线程1释放锁的关系,会唤醒AQS队列中的头结点,所以线程2会获取到锁。
5、线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒。只是加入到了AQS等待队列中去了
6、待线程2执行完成之后并调用lock.unlock()释放锁之后,会唤醒此时在AQS队列中的头结点.所以线程1开始争夺锁(由于此时只有线程1在AQS队列中,因此没人与其争夺),如果获得锁继续执行。
7、直到线程1释放锁整个过程执行完毕。
可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。
下面分析一下signal()方法
public final void signal() { //检测当前线程是否为拥有锁的独占线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); /* firstWaiter为condition自己维护的一个链表的头结点, 取出第一个节点后开始唤醒操作 */ Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { //修改头结点,完成旧头结点的移出工作 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
doSignal()方法干了两件事:1、修改条件队列中的头结点,1、完成旧的头结点的移出工作,即从Condition队列中移出到AQS同步队列中去。
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //如果不能改变等待状态,那么当前节点被取消,return false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点 Node p = enq(node); int ws = p.waitStatus; //将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程.
只有到发送signal信号的线程调用reentrantLock.unlock()后,它已经被加到AQS的等待队列中,才可能会被唤醒.
ReentrantLock
ReentrantLock可重入锁,与synchronized类似,但更方便灵活,可作为替代使用:1.支持公平/非公平锁
2.支持响应超时,响应中断
3.支持condition
ReentrantLock实现了Lock接口,内部继承AQS实现这些功能,使用AQS的state来表示锁的重入次数,lock接口如下
看下如何实现AQS的
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; abstract void lock(); //非公平锁tryAcquire final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //判断state是否被占用 if (c == 0) {//未被占用则cas占用,并设置当前线程为占用线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//如果被占用,则判断是否是当前线程占用的(锁可重入),如果是就把state+1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //释放资源 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread())//如果当前线程不是占用线程,则报错 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//状态-1后如果资源未被占用,则没有占用线程 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //当前线程是否是占用线程 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } //获取Codition,conditionObject维护一个条件队列,见上篇 final ConditionObject newCondition() { return new ConditionObject(); } //获取占用线程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
ReentrantLock的实现又分为两种,公平锁、非公平锁,咱们直接看非公平锁(公平锁前面分析AQS的时候分析过了,http://blog.csdn.net/kevindai007/article/details/70314814):
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { //直接尝试获取资源(不管有没有线程在条件队列中等待) if (compareAndSetState(0, 1)) //获取成功则把当前线程设置为占有线程 setExclusiveOwnerThread(Thread.currentThread()); else //如果未成功则加入等待队列(加入等待队列的线程是依次被唤醒的) acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
ReentrantLock释放锁的过程在AQS的分析中都已经分析过了,在此不做重复分析.
ReetrantLock的公平和非公平的区分就是在Acquire的时候,非公平会先直接尝试cas修改,不成功再去排队;而公平锁就是老老实实请求排队操作。
相关文章推荐
- JDK源码——java.util.concurrent(二)
- JDK源码——java.util.concurrent(四)
- JDK源码——java.util.concurrent(七)
- JDK源码(线程池ThreadPoolExecutor)——java.util.concurrent(九)
- Jdk源码阅读之Java.util.concurrent
- JDK源码(FutureTask)——java.util.concurrent(十)
- Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析
- JDK源码——java.util.concurrent(八)
- JDK源码——java.util.concurrent(五)
- JDK源码——java.util.concurrent
- JDK源码——java.util.concurrent(六)
- 《java.util.concurrent 包源码阅读》02 关于java.util.concurrent.atomic包
- java类库的阅读笔记_jdk1.7.0_40_java.util.concurrent.locks.LockSupport
- java类库的阅读笔记_jdk1.7.0_40_java.util.concurrent.ConcurrentHashMap
- Java concurrent Framework并发容器之ConcurrentHashMap(Doug Lea 非JDK版)源码分析
- 《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue
- 《java.util.concurrent 包源码阅读》05 BlockingQueue
- 【jdk源码解析三】java.util.Hashtable
- JDK源码分析——Java.util.Vector的浅析
- 关于JDK中的java.util.concurrent.atomic