高并发学习笔记(四)
2019-03-22 19:01
78 查看
一、同步器
1.AbstractQueuedSynchronizer(AQS)
AQS
提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架(后面要学的CountDownLatch,ReentrantLock均基于此实现的
)。
AQS的继承关系如下图:
其中AbstractOwnableSynchronizer是可以有线程独占方式拥有的同步器:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { //构造方法 protected AbstractOwnableSynchronizer() { } //独占模式下,拥有同步器的线程 private transient Thread exclusiveOwnerThread; // 设置当前拥有独占访问的线程 protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } //返回最后拥有独占访问的线程 protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }看完父类中的方法,再来看看AQS中的实现,AQS的底层是基于双向链表实现的一个队列,队列就是用来实现线程并发访问控制的,结点的数据结构如下图: Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表 ,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue( Condition 后面会讲,暂时可先放一边)。
//同步队列中每个被阻塞的线程都会被封装成一个Node结点 static final class Node { //结点模式,分为独占和共享 //共享模式 static final Node SHARED = new Node(); //独占模式 static final Node EXCLUSIVE = null; //每个结点的状态对应有5个 //waitStatus=0表示当前结点在同步队列中,等待获取锁 //CANCELLED表示线程被取消的状态 static final int CANCELLED = 1; //SIGNAL表示当前结点及后继结点中的线程都需要唤醒 static final int SIGNAL = -1; //CONDITION表示当前结点在等待condition static final int CONDITION = -2; //PROPAGATE表示唤醒操作会被传播给后续节点,仅在共享模式下有用 static final int PROPAGATE = -3; //结点状态 volatile int waitStatus; //前驱结点 volatile Node prev; //后继结点 volatile Node next; //结点对应的线程 volatile Thread thread; //下一个等待结点 Node nextWaiter; //队列是否是共享模式 final boolean isShared() { return nextWaiter == SHARED; } //获取前驱结点,为null抛异常 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } //无参构造方法,用于建立初始头或SHARED标记 Node() { } //mode为null表示建立独占模式的队列,mode=SHARED表是建立共享模式的队列 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } //传入状态的构造方法 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }知道了父类的作用和结点类的构成,再来看看AQS的功能,由父类及结点类可知道AQS的功能分为两种:独占及共享。 独占锁模式下,每次只能有一个线程能持有锁,ReentrantLock就是以独占方式实现的互斥锁。共享锁,则允许多个线程同时获取锁,并发访问、共享资源,如:ReadWriteLock。 显然,独占锁是一种悲观锁,它将读/读冲突都避免了,如果某个只读线程获取锁,则其他读线程都只能等待,这种情况下就限制了不必要的并发性,因为读操作并不会影响数据的一致性。共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。 2.独占模式 AQS提供了两套模板来分别实现独占锁即共享锁,其中实现独占锁 功能的子类必须实现 tryAcquire、tryRelease、isHeldExclusively等方法;实现 共享锁功能的子类,必须实现tryAcquireShared和tryReleaseShared等方法,带有Shared后缀的方法都是支持共享锁加锁的语义。
//AQS源码中,独占锁及共享锁需要实现的五个方法 //尝试在独占模式下获取锁 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //尝试在独占模式下释放锁 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //当前线程是否正在独占锁。只有用到condition才需要去实现它 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } //尝试在共享模式下获取锁 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //尝试在共享模式下释放锁 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //AQS中的重要属性及方法 //线程同步队列中的头结点 private transient volatile Node head; //线程等待获取锁时的自旋最长时间, static final long spinForTimeoutThreshold = 1000L; //线程同步队列的尾结点 private transient volatile Node tail; //表示AQS的状态,根据体积实现有不同含义(CountDownLatch中表示门闩上锁的数量) private volatile int state; //偏移量及unsafe用于CAS更新操作 private static final Unsafe unsafe = Unsafe.getUnsafe(); //JDK底层工具类 private static final long stateOffset; //AQS状态的内存地址偏移量 private static final long headOffset; //头结点的内存偏移量 private static final long tailOffset; //尾结点的内存偏移量 private static final long waitStatusOffset; //结点状态的内存偏移量 private static final long nextOffset; //下个结点的内存偏移量 //获取状态 protected final int getState() { return state; } //设置状态 protected final void setState(int newState) { state = newState; } //CAS方式更新状态值 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } //CAS方式更新头结点 private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } //CAS方式更新尾结点 private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } //CAS方式更新结点状态 private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } //CAS方式更新下个结点 private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); }下面以独占模式下获取和释放锁以及共享模式下获取和释放锁来看看AQS的源码是怎么实现的。 首先,独占模式下获取锁:
//AQS中独占模式下获取锁的方法 //以独占模式获取对象,忽略中断 public final void acquire(int arg) { //尝试获取锁,若失败,则将当前线程封装成一个Node结点加入同步队列中,并调用acquireQueued方法来监视同步队列以获取锁资源 //acquireQueued会阻塞知道获取到锁,且acquireQueued方法不会响应中断 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //若阻塞等待获取锁期间发生中断操作,则selfInterrupt方法会重置中断标识,进行自我中断。 } //以共享或独占方式新增等待结点 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //获取当前尾结点 if (pred != null) { //判断队列是否初始化,即为结点是否为null node.prev = pred; //新增等待结点的前驱结点为当前尾结点 if (compareAndSetTail(pred, node)) { /CAS方式更新尾结点,失败的话会在enq(node)方法中继续尝试插入 pred.next = node; //旧尾结点的后继结点为新尾结点。 return node; //返回新增结点 } } enq(node); //同步队列初始化或继续插入结点到末尾 return node; //返回插入的结点 } //结点入队方法,也含有同步队列初始化的功能 private Node enq(final Node node) { for (;;) { //无线循环,保证结点插入成功 Node t = tail; //获取尾结点 if (t == null) { //判断队列是否初始化过 //尾结点为空说明队列还没初始化,队列中一个结点都没有 if (compareAndSetHead(new Node())) //以CAS方式生成队列头结点,头结点中没有线程仅有下个结点的指针 tail = head; } else { //已初始化,队列不为空 node.prev = t; //将当前尾结点设置为新的尾结点的前驱结点 if (compareAndSetTail(t, node)) { //以CAS方式更新尾结点 t.next = node; //旧尾结点的后继结点为新尾结点。 return t; //返回旧尾结点 } } } } //获取不到锁,将结点加入同步等待队列阻塞知道获取到锁 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //表示获取锁是否失败,true为失败,false为没有失败 try { boolean interrupted = false; //标识等待锁的期间是否又被打断过,false标识没有,true表示被打断过 for (;;) { //循环等待 final Node p = node.predecessor(); //获取node的前结点p //判断p结点是否为队列的头结点,若p是头结点则再次尝试获取锁 if (p == head && tryAcquire(arg)) { //node的前驱是头结点,且node对应的线程获取到锁就将node设为队列的头结点 setHead(node); p.next = null; //设置p的next为null,方便GC回收 failed = false; //标识没有失败 return interrupted; } //获取锁失败,head结点与node之间还有等待线程或者head结点的线程正在使用资源 //判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; //到这表示线程必然被中断过 } } finally { if (failed) //获取锁失败,正常情况不会失败,而是一直等待获取锁,除非除了异常导致自旋被打断 cancelAcquire(node); //失败后要取消该结点 } } //在获取失败后是否应该停顿下来 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //获取pred的状态 //判断pred的状态为SIGNAL,若是,直接返回true if (ws == Node.SIGNAL) return true; //判断pred是否被取消 if (ws > 0) { do { //往前查找pred前驱结点中第一个尚未被取消的结点作为node的前驱结点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0);的后继结点改为node pred.next = node; //pred } else { //尝试以CAS方式更新pred结点的状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //调用LockSupport的静态方法park阻塞当前线程,直到被中断,这次中断会被acquireQueued记录,但不会立即响应,直到自旋完成 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //为了线程调度,禁用当前线程,除非许可可用 return Thread.interrupted(); //返回当前线程是否被中断 } //以CAS方式尝试更新结点的状态 private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update); //通过unsafe类实现原子操作更新 } //设置同步队列的头结点,头结点必须不能拥有线程及前驱结点为null private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } //进行自我中断 static void selfInterrupt() { Thread.currentThread().interrupt(); } //取消获取锁操作 private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; //获取前驱结点 //查找在node之前第一个尚未被取消的结点为node的前驱结点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; //记录pred结点的后继结点 node.waitStatus = Node.CANCELLED; //将node状态设置为CANCELLED //判断node结点是否为尾结点,若为尾结点则尝试以CAS方式将pred设为新的尾结点 if (node == tail && compareAndSetTail(node, pred)) { //成功将pred更新为新的尾结点,再尝试以CAS方式更新pred的后继为null compareAndSetNext(pred, predNext, null); } else { int ws; //判断((pred结点是否Wie头结点且pred的状态是否是SIGNAL)或(pred结点未被取消且尝试更新pred结点状态为SIGNAL成功))且pred结点下的线程不为null if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; //获取node的后继结点 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); //以CAS方式设置pred的后继结点为next } else { unparkSuccessor(node); } node.next = node; // help GC } } //释放node的后继结点 private void unparkSuccessor(Node node) { int ws = node.waitStatus; //获取结点状态 if (ws < 0) //判断结点状态小于0 //若结点状态不处于等待获取锁状态,就以CAS方式更新结点状态为等待获取锁状态 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //获取结点的后继结点 //判断后继结点是否为空,或已被取消,结点的status只有取消状态的值>0 if (s == null || s.waitStatus > 0) { s = null; //从尾结点开始往前遍历队列,直到node结点时结束 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //查找node结点之后,最靠近node结点且未被取消的结点 s = t; } if (s != null) //s不为null,释放s结点中的线程 LockSupport.unpark(s.thread); } //以独占模式获取对象,如果被中断则中止 public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) //判断线程是否被中断过 throw new InterruptedException(); if (!tryAcquire(arg)) //判断尝试获取锁是否成功 doAcquireInterruptibly(arg); //不成功进入该方法 } //真正执行可被中断的等待获取锁的方法 private void doAcquireInterruptibly(int arg)throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); //增加一个独占模式的结点到同步队列中 boolean failed = true; //标识锁获取成功与否的标志 try { for (;;) { final Node p = node.predecessor(); //获取node结点的前驱结点 //判断结点p是否为头结点,且再次尝试获取锁 if (p == head && tryAcquire(arg)) { //获取到锁,将node结点设为头结点(获取到锁说明所有在node之前的结点均已执行完) setHead(node); p.next = null; // help GC failed = false; return; } //获取锁失败,head结点与node之间还有等待线程或者head结点的线程正在使用资源 //判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); //直接抛出中断异常,与acquireQueued方法区别 } } finally { if (failed) //因异常而导致获取锁失败,node结点设为取消状态 cancelAcquire(node); } } //试图以独占模式获取对象,如果被中断则中止,如果到了给定超时时间,则会失败 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); //尝试获取锁, } private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) //判断超时时间是否合法 return false; final long deadline = System.nanoTime() + nanosTimeout; //获取超时时间戳,即截止时间 final Node node = addWaiter(Node.EXCLUSIVE); //独占模式增加等待结点 boolean failed = true; //标识获取锁是否成功 try { for (;;) { final Node p = node.predecessor(); //获取node的前驱结点 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } //获取锁失败,head结点与node之间还有等待线程或者head结点的线程正在使用资源 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) //超时时间不正确,直接返回 return false; //判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作 //且若是超时时间小于自旋最大时间,就使用自旋,否则再进行park操作 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); //park阻塞当前线程一段时间 if (Thread.interrupted()) throw new InterruptedException(); //线程被中断,抛异常 } } finally { if (failed) //因异常而导致获取锁失败,node结点设为取消状态 cancelAcquire(node); } }独占模式下的流程图(图片来源 https://www.geek-share.com/detail/2714157501.html )如下: 独占模式下释放锁: //独占模式下释放锁 public final boolean release(int arg) { if (tryRelease(arg)) { //判断尝试释放锁是否成功 Node h = head; //获取同步队列头结点 //头结点是否为null,若不为null且头结点状态是SIGNAL执行unparkSuccessor方法 //(独占模式下,waitStatus!=0与waitStatus==-1等价,这里waitStatus不会为CANCELLED,因为已经获取资源了) if (h != null && h.waitStatus != 0) unparkSuccessor(h); //释放头结点的后继结点 return true; //成功释放锁 } return false; } 3.共享模式 共享模式下获取锁:
//共享模式下获取锁,忽略中断 public final void acquireShared(int arg) { //tryAcquireShared的返回值表示剩余资源个数,负值表示获取失败,0表示获取成功但已无剩余资源 if (tryAcquireShared(arg) < 0) //共享模式下尝试获取锁,返回值大于0表示获取到锁 doAcquireShared(arg); // } //共享模式下不可被中断的等待获取锁 private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); //将当前线程以共享模式加入同步等待队列 boolean failed = true; //标识获取锁是否成功 try { boolean interrupted = false; //标识是否被中断 for (;;) { final Node p = node.predecessor(); //获取前驱结点p if (p == head) { //判断p是否是头结点,即前驱是头结点才能尝试获取锁 int r = tryAcquireShared(arg); //尝试获取锁 //r > 0表示获取锁成功 if (r >= 0) { //设置node为新的头结点,并唤醒node后续为共享模式的结点(若后继结点有独占,则唤醒在独占出结束) setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //判断档期啊线程是否要挂起,即park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //检查并试图唤醒之后的等待线程 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //记录旧的头结点信息 setHead(node); //设置node为新的头结点,即将获取锁成功的结点设为头结点 //什么情况下可以去唤醒后继结点 //propagate>0表示还有锁资源可以被获取,可以唤醒后继结点 //h==null旧头结点为null,可以去唤醒后继结点(这条貌似不可能出现,不清楚意义在哪) //head==null,头结点可能已经被其他线程结点替换,导致node后继为null /结点h的状态为SIGNAL或PROPAGATE if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { //获取node后继结点 Node s = node.next; //s为null或s是共享模式时执行doReleaseShared方法,即传播唤醒行为给后继结点 if (s == null || s.isShared()) doReleaseShared(); } } //唤醒后继结点去尝试获取锁 private void doReleaseShared() { for (;;) { //循环唤醒所有共享模式的结点 Node h = head; //获取当前头结点 //判断头结点是否为null,或头结点是否与尾结点是同一结点 //即同步队列中的等待结点数要大于1个 if (h != null && h != tail) { int ws = h.waitStatus; //获取头结点状态 //判断头结点的状态,若为SIGNAL表示需要变为等待状态 if (ws == Node.SIGNAL) { //判断进行CAS原子操作更新h为等待状态是否成功,不成功继续循环更新知道成功为止 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); //成功更新h的状态后,释放h的后继结点(即唤醒后继结点中的线程) }else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //若结点状态为0,则尝试更新结点状态为PROPAGATE,失败就一直尝试到成功为止 continue; } if (h == head) //h == head时,表示没有共享结点需要被唤醒,或者所有的共享结点都已经被唤醒完了 break; //退出循环 } } //共享模式下获取锁,如果被中断则中止 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //尝试获取锁 doAcquireSharedInterruptibly(arg); //获取锁失败,进入等待并继续尝试获取锁 } //共享模式下可被中断的等待获取锁,与doAcquireShared基本相同,只是可被中断 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //将等待线程封装成共享模式结点加入等待队列末尾 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); //被中断直接抛异常,并取消结点 } } finally { if (failed) cancelAcquire(node); } } //共享模式下获取锁,如果被中断则中止,如果到了给定的超时时间,则会失败 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } //在一定时间内尝试获取锁,超时则失败 private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) //判断超时时间是否合法 return false; final long deadline = System.nanoTime() + nanosTimeout; //超时时间对应的时间戳 final Node node = addWaiter(Node.SHARED); //以共享模式加入等待同步的队列 boolean failed = true; //尝试获取锁操作是否成功的标识 try { for (;;) { final Node p = node.predecessor(); //获取前驱结点 if (p == head) { //判断前驱是否为头结点,当p是头结点才能尝试获取锁 int r = tryAcquireShared(arg); if (r >= 0) { //同上 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; //判断超时时间是否超过自旋最长时间,超过就让结点线程禁用(等待唤醒后在解除禁用)。 //没超过就自旋等待获取锁 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) //获取锁失败,取消结点 cancelAcquire(node); } }
共享模式下尝试获取锁的过程如下图(图片来源 https://www.geek-share.com/detail/2715109349.html): 共享模式下释放锁:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //尝试释放锁 doReleaseShared(); return true; } return false; }二、条件Condition 1.Condition接口 我们知道任意一个Java对象都有wait和notify、notifyAll方法,可用于synchronized配合使用(如生产者消费者模式),实现线程的等待/通知。因而为了配合同步器实现类也实现与wait和notify相类似的功能,jdk提供了Condition接口:
public interface Condition { //Object类中wait方法类似,使当前线程在收到信号之前进入等待状态 //与此 Condition 相关的锁以原子方式释放,并且出于线程调度的目的,将禁用当前线程,且在发生以下四种情况之一 以前,当前线程将一直处于休眠状态: //其他某个线程调用此 Condition 的 signal() 方法,并且碰巧将当前线程选为被唤醒的线程;或者 //其他某个线程调用此 Condition 的 signalAll() 方法;或者 //其他某个线程中断当前线程,且支持中断线程的挂起;或者 //发生“虚假唤醒”(指唤醒时唤醒了多个等待线程,导致结果不正确) void await() throws InterruptedException; //使当前线程在收到信号之前进入等待状态,但不能被中断 void awaitUninterruptibly(); //线程进入等待状态nanosTimeout纳秒的时间,超时自动唤醒 long awaitNanos(long nanosTimeout) throws InterruptedException; //线程进入等待状态timeout时间,超时自动唤醒 boolean await(long time, TimeUnit unit) throws InterruptedException; //线程在到达指定期限前进入等待状态 boolean awaitUntil(Date deadline) throws InterruptedException; //唤醒其中某一个线程 void signal(); //唤醒所有等待线程 void signalAll(); }
其典型示例,生产者消费者模式的代码:
/** * 生产者消费者模式的condition实现 * Created by bzhang on 2019/3/18. */ public class TestCondition { //ReentrantLock重入锁,是AQS同步器独占模式的一个实现,后续会学习 private ReentrantLock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); private Queue<Object> queue = new LinkedList<>(); public void make(){ while (true){ lock.lock(); try { while (queue.size()>=5){ try { System.out.println("满了满了,放不下了!"); producer.await(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } queue.offer(new Object()); System.out.println(Thread.currentThread().getName()+"生产了一个产品。库存有"+queue.size()); consumer.signalAll(); }finally { lock.unlock(); } } } public void take(){ while (true) { lock.lock(); try { while (queue.size() <= 0) { try { System.out.println("完了完了,没东西了!"); consumer.await(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } queue.poll(); System.out.println(Thread.currentThread().getName() + "消费了一个产品。库存有" + queue.size()); producer.signalAll(); } finally { lock.unlock(); } } } public static void main(String[] args) { TestCondition test = new TestCondition(); new Thread(new Runnable() { @Override public void run() { test.make(); } }).start(); new Thread(new Runnable() { @Override public void run() { test.take(); } }).start(); } }2.Condition的实现类 Condition的实现类是ConditionObject,ConditionObject是AQS同步器的内部类,Condition的操作需要获取相关联的锁,因此需要和同步器挂钩。每个Condition对象都包含着一个队列(等待队列),Condition中也有结点的概念,在将线程放到等待队列中时会构造结点,而这个结点的定义其实是复用了同步器中节点的定义。
public class ConditionObject implements Condition, java.io.Serializable { //条件等待队列中的队首 private transient Node firstWaiter; //条件等待队列的队尾 private transient Node lastWaiter; //在退出等待时重置中断状态 private static final int REINTERRUPT = 1; //退出等待时抛出中断异常 private static final int THROW_IE = -1; //空构造 public ConditionObject() { } //等待方法的实现 //await会将挂起的node结点加入等待队列,当被唤醒后尝试加入同步队列等待获取锁 //获取到锁后继续执行后续逻辑 public final void await() throws InterruptedException { if (Thread.interrupted()) //判断线程是否被中断 throw new InterruptedException(); Node node = addConditionWaiter(); //新增等待结点 int savedState = fullyRelease(node); //返回完全释放node结点的结果 int interruptMode = 0; //判断node是否在同步队列中,若不在就将其挂起 while (!isOnSyncQueue(node)) { LockSupport.park(this); //挂起当前线程 //判断挂起线程期间是否被中断过,0为没被中断过 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //到这说明结点已经被唤醒了,尝试获取独占锁 //当node结点获取到锁后,且被中断过 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) //若node的后继等待结点不为空,清除等待队列中被取消的结点 unlinkCancelledWaiters(); if (interruptMode != 0) //判断是否在等待挂起期间被中断过 reportInterruptAfterWait(interruptMode); //根据中断模式决定后续中断行为,抛异常或是重置中断状态 } //新增等待结点,向队尾新增 private Node addConditionWaiter() { Node t = lastWaiter; //获取当前队尾结点 //判断尾结点是否为null,且状态是否是CONDITION //即判断尾结点是否被取消 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); //清除队列中状态为被取消的结点 t = lastWaiter; //获取新的尾结点 } Node node = new Node(Thread.currentThread(), Node.CONDITION); //以当前线程创建新的等待结点 if (t == null) //判断队列是否为空 firstWaiter = node; //空队列新增的就是队首结点 else t.nextWaiter = node; //否则新增的就为队尾 lastWaiter = node; return node; } //清除等待队列中所有被取消的结点 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } //该方法是AQS中的方法,释放node结点 final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); //获取AQS状态 if (release(savedState)) { //判断该以独占模式释放锁是否成功 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) //释放失败,该结点被取消 node.waitStatus = Node.CANCELLED; } } //判断结点是否在同步队列中 //结点的的状态等于CONDITION或者前驱结点pre为空,则不表示不在同步队列中 //结点的后继不为null则表示在同步队列中(只有在同步队列中才会使用pre和next,等待队列WaitQueue没有) final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); //从尾结点往前遍历查找同步队列中是否有结点node } //判断同步队列中是否含有node结点(从后往前找) private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } //检查结点在等待期间是否被中断过 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } //若有必要取消等待,将结点转移到同步队列 final boolean transferAfterCancelledWait(Node node) { //判断更新结点状态是否成功,成功将结点状态由CONDITION更新为0则转移结点到同步队列 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //成功更新结点为等待状态,将其加入同步队列 enq(node); return true; } //不在同步队列中则让出CPU资源 while (!isOnSyncQueue(node)) Thread.yield(); return false; } //根据中断模式决定node被中断后是抛异常还是直接重置中断状态 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); //抛异常 else if (interruptMode == REINTERRUPT) selfInterrupt(); //重置 } //不可被中断的等待方法。与await差别仅仅是不会被中断 public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } //等待nanosTimeout纳秒,可被中断 public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) //判断是否被中断 throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; //等待到什么时间 int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { //等待时间不正确直接尝试转移结点进同步队列,结束等待 transferAfterCancelledWait(node); break; } //等待时间不超过最大自旋时间就不挂起,直接自旋等待,超过则挂起 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //确认等待期间是否被中断过,且中断模式是什么 break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); //返回值大于0说明等待时间没有用完,小于等于0表示等待时间用完 } //最多等待到deadline的时间 public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); //获取时间戳 if (Thread.interrupted()) //判断是否被中断过 throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); //挂起当前线程abstime时间 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } //与awaitNanos相同,仅仅时间单位可能不同 public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } //唤醒某一线程(即将其从等待队列中移到同步队列去竞争获取锁) //默认是将等待最久的线程移动到同步队列(但不绝对,因为可能不存在等待最久的线程) public final void signal() { //判断当前要唤醒的线程是否是独占模式 //表名Condition只能使用独占模式,不能使用共享模式 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //获取等待队列的队首结点 if (first != null) doSignal(first); //唤醒队首结点 } //真正执行唤醒队首结点的方法 private void doSignal(Node first) { do { //判断等待队列是否为null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //要唤醒的结点的下个结点赋空,因为要移出等待队列了 } while (!transferForSignal(first) && //当唤醒失败且存在下个结点时,继续唤醒,直到有个线程被唤醒为止 (first = firstWaiter) != null); } //唤醒并将结点转入同步队列 final boolean transferForSignal(Node node) { //尝试以CAS方式更新结点状态为0,失败就直接返回 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); //将结点插入同步队列中 int ws = p.waitStatus; //若结点为被取消状态或更新成SIGNAL值失败,就释放结点中的线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } //唤醒所有线程(即将所以等待队列中的结点移动到同步队列中) public final void signalAll() { if (!isHeldExclusively()) //判断是否是独占模式 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //判断等待队列是否存在 doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); //循环唤醒保证所有结点均被唤醒 } 判断该条件队列是否是由sync创建的 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } //等待队列中是否还有等待结点 protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } //返回等待队列中的等待结点数,是个估计值 protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } //返回包含那些可能正在等待此条件的线程 collection protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }
OK,到这Condition接口就分析完了。
相关文章推荐
- Go语言并发与并行学习笔记(一)
- 学习笔记(九)并发(三)
- Go语言并发与并行学习笔记(一)
- Java并发学习笔记(八)-LinkedBlockingQueue
- Go语言学习笔记-并发
- 并发编程学习笔记之Lock与synchronized
- Java并发学习笔记(15)信号量(Semaphore) 关卡((2)CyclicBarrier)
- 学习笔记:java并发编程学习之初识Concurrent
- JAVA并发编程学习笔记之ReentrantLock (r)
- MySQL学习笔记之四:并发控制和事务机制
- Java并发编程学习笔记 深入理解volatile关键字的作用
- 学习JAVA多线程编程 --- 《JAVA多线程编程核心技术》第2章 对象及变量的并发访问 笔记
- WCF学习笔记之并发与限流
- Java并发读书学习笔记(九)——性能与可伸缩性
- Java并发读书学习笔记(十)——显式锁
- Java学习笔记—多线程(并发工具类)
- Java并发读书学习笔记(十一)——原子变量与非阻塞同步机制
- 并发编程实战学习笔记(七)——避免活跃性问题
- 并发编程实战学习笔记(八)——性能与可伸缩性
- 并发学习笔记(三):join与wait/notify