您的位置:首页 > 编程语言 > Java开发

Java并发包中锁原理剖析(一)

2019-01-14 17:30 330 查看

1. LockSupport 工具类

LockSupport 主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础。

LockSupport 类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport类的方法是不持有许可证的。LockSupport是使用Unsafe类实现的。下面介绍LockSupport中的几个主要函数。

  1. viod park() 方法

    如果调用park方法的线程已经拿到了与LockSuport关联的许可证,则调LockSupport.park()时会马上返回,否则调用线程会被禁止参与线程调度,就是被阻塞挂起

    /**
    * @author xsh
    * @date 2019/1/4
    * @since 1.0.0
    */
    public class LockSupportTest {
    
    public static void main(String[] args) {
    
    System.out.println("  test   begin  ...   ");
    
    LockSupport.park();
    
    System.out.println("  test   end   ...   ");
    
    }
    
    }

    执行结果:

    test   begin  ...

    另外,在其他线程调用unpark(Thread thread)方法并且当前线程作为参数时,调用park方法被阻塞的线程会返回。如果线程调用了阻塞线程的interrupt()方法,设置了中断标志或者线被虚假唤醒,则阻塞线程也会返回。所以在调用park方法时最好也使用循环条件判断方式。这里需要注意的是,因调用park()方法而被阻塞的线程被其他线程中断返回时不会抛出InterruptedException 异常。

  2. void unpark(Thread thread)方法

    当一个线程调用unpark时,如果参数thread线程没有持有thread与LockSupport类关联的许证,则让thread线程持有。如果thread之前因调用park()而被挂起,则调用unpark后,该线程会被唤醒。如果thread之前没有调用park,则调用unpark方法后,再调用park方法,其会立刻返回。

      /**
      * @author xsh
      * @date 2019/1/14
      * @since 1.0.0
      */
      public class UnparkTest {
      
      public static void main(String[] args) throws Exception {
      
      Thread thread = new Thread(()->{
      
      System.out.println("  running ...  ");
      
      LockSupport.park();
      
      System.out.println("   end  ...  ");
      
      });
      
      thread.start();
      
      TimeUnit.SECONDS.sleep(2);
      
      LockSupport.unpark(thread);
      
      }
      }

      执行结果 :

      running ...
      // 两秒后
      end  ...
    • /**
      * @author xsh
      * @date 2019/1/14
      * @since 1.0.0
      */
      public class UnparkTest {
      
      public static void main(String[] args) throws Exception {
      
      Thread thread = new Thread(()->{
      
      System.out.println("  running ...  ");
      
      try {
      TimeUnit.SECONDS.sleep(2);
      }catch (Exception e){}
      
      LockSupport.park();
      
      System.out.println("   end  ...  ");
      
      });
      
      thread.start();
      
      LockSupport.unpark(thread);
      
      }
      }

      执行结果相同。

    • public class UnparkTest {
      
      public static void main(String[] args) throws Exception {
      
      Thread thread = new Thread(()->{
      
      System.out.println("  running ...  ");
      
      while (!Thread.currentThread().isInterrupted()){
      LockSupport.park();
      }
      
      System.out.println(Thread.interrupted());
      
      System.out.println(  Thread.currentThread().isInterrupted());
      
      System.out.println("   end  ...  ");
      
      });
      
      thread.start();
      
      TimeUnit.SECONDS.sleep(2);
      
      thread.interrupt();
      
      }
      }

    ​ 执行结果相同

  3. void parkNanos(long nanos) 方法 和park方法类似,如果调用park方法的线程已经拿到了与LockSupport关联的许可证,则调用LockSupport.parkNanos(longnanos)方法后会马上返回该方法的不同在于,如果没有拿到许可证,则调用线程会被挂起nanos时间后修改为自动返回。

    另外park方法还支持带有blocker参数的方法voidpark(Object blocker)方法,当钱程在没有持有许可证的情况下调用park方法而被阻塞挂起时,这个blocker对象会被记录到该线程内部。使用诊断工具可以观察线程被阻塞的原因,诊断工具是通过调用getBlocker(Thread)方法来获取blocker对象的,所以JDK推荐我们使用带有blocker参数的park方法,并且blocker被设置为this

    ,这样当在打印线程堆横排查问题时就能知道是哪个类被阻塞了。

    /**
    * @author xsh
    * @date 2019/1/14
    * @since 1.0.0
    */
    public class BlockerTest {
    
    public  void  blockerTest(){
    LockSupport.park(this);
    }
    
    public static void main(String[] args) {
    BlockerTest blockerTest = new BlockerTest();
    blockerTest.blockerTest();
    
    }
    
    }

    使用jstack [pid] 可以看到 :

    “main” #1 prio=5 os_prio=0 tid=0x0000000003023800 nid=0x19b8 waiting on condition [0x0000000002e5f000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000d5ea6378> (a com.juc.principle.locksupport.BlockerTest) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at com.juc.principle.locksupport.BlockerTest.blockerTest(BlockerTest.java:13) at com.juc.principle.locksupport.BlockerTest.main(BlockerTest.java:19)

2).抽象同步队列AQS概述

​ AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。其中Node中的thread变量用来存放进入AQS队列里面的线程:Node节点内部的SHARED来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的;waitStatus记录当前线程等待状态,可以为CANCELLED(线程被取消了)、SIGNAL (线程需要被唤醒)、CONDITION(线程在条件队列里面等待〉、PROPAGATE(释放共享资源时需要通知其他节点〕;prev记录当前节点的前驱节点,next记录当前节点的后继节点。

/** 线程被取消了 */
static final int CANCELLED =  1;
/**线程需要被唤醒 */
static final int SIGNAL    = -1;
/** 线程在条件队列里面等待*/
static final int CONDITION = -2;
/**
释放共享资源时需要通知其他节点
*/
static final int PROPAGATE = -3;

​ 在AQS中维持了一个单 一的状态信息state,可以通过getState、setState,compareAndSetState函数修改其值。对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数;对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;对于semaphore来说,state用来表示当前可用信号的个数:对于CountDownlatch来说,state用来表示计数器当前的值。AQS有内部类ConditionObject,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,这个条件队列的头、尾元素分别为自fristWaiter和last Waiter。

​ 对于AQS来说,线程同步的关键是对状态值state进行操作。根据state是否属于一个线程,操作

state的方式分为独占方式和共享方式。在独占方式下获取和释放资源使用的方法为:void acquire( int arg) void acquirelnterruptibly(int arg) boolean release( int arg)。在共享方式下获取和释放资源的方法为:void acquireShared(int arg) void acquireSharedinterruptibly(int arg)boolean reaseShared(int arg)。使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock的实现,当一个线程获取了ReerrantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从l变为2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。

​ 对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。比如Semaphore信号量,当一个线程通过acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。

  • 在独占方式下,获取与释放资源的流程如下:

      void acquire(int arg) :

      public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
      }

      调用tryAcquire方法 具体是修改state状态的值,成功返回true ,否则false (由子类实现)。 失败则添加到 waiter 队列中,具体代码见下:

      private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      Node pred = tail;
      if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
      }
      }
      enq(node);
      return node;
      }

      新建一个Node ,将模式设为独占模式,在tail 尾节点不为空的时候,将当前节点设置为尾节点并返回 。如果尾节点为空,如下:

      private Node enq(final Node node) {
      for (;;) {
      Node t = tail;
      if (t == null) { // Must initialize
      if (compareAndSetHead(new Node()))
      tail = head;
      } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
      t.next = node;
      return t;
      }
      }
      }
      }

      尾节点为空时,将一个新的节点设为head(哨兵),将tail指向head , 因为是for循环,

      会再次进入,此时tail不为空,将前节点的前驱节点设为tail (此时也等于head) ,将当前节点设置为尾节点,然后将head的next指向当前节点。此时当前节点为尾节点,且首节点的next指向当前节点。

      final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
      setHead(node);
      p.next = null; // help GC
      failed = false;
      return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }

      当前节点的前驱节点为head时,将当前节点设为head,释放哨兵(以上代码的p) ,interrupted为false 不需要selfInterrupt ; 如果不为head是,则LockSupport#park() ,挂起线程。

    1. boolean release(int arg) :

    public final boolean release(int arg) {
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }

    调用tryRelease方法 具体是修改state状态的值,成功返回true ,否则false (由子类实现)。

    private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
    
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread);
    }
    将当前节点waitStatus设置为0;取下一个节点,如果waitStatus大于0(canceled)设为null。

    然后从尾节点开始将最前端waitStatus小于0的节点取出,调用LockSupport#unpark方法,唤醒。

  • 在共享方式下,获取与释放资源的流程如下:

      void acquireShared(int arg):

      public final void acquireShared(int arg) {
      if (tryAcquireShared(arg) < 0)
      doAcquireShared(arg);
      }

      与以上相同tryAcquireShared由子类实现,

      private void doAcquireShared(int arg) {
      final Node node = addWaiter(Node.SHARED);
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
      int r = tryAcquireShared(arg);
      if (r >= 0) {
      setHeadAndPropagate(node, r);
      p.next = null; // help GC
      if (interrupted)
      selfInterrupt();
      failed = false;
      return;
      }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }

      大多实现与独占相似。 boolean releaseShared(int arg) 方法也是类似,不再介绍

  • AQS—条件变量的支持

    ​ 正如在基础篇中讲解的,notify和wait,是配合synchronized内置锁实现线程间同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁〉实现线程间同步的基础设施。它们的不同在于,synchronized同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量。在基础篇中讲解了,在调用共享变量的notify和wait方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁。

    ​ ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量state)和方法。在每

    个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。注意

    这个条件队列和AQS队列不是一回事。

    public final void await() throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
    if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
    }

    ​ 当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。这时候如果有其他线程调用lock.lock()尝试获取锁,就会有-个线程获取到锁,如果获取到锁的线程调用了条件变量的await()方法,则该

    线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。

    public final void signal() {
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    doSignal(first);
    }

    当另外一个线程调用条件变量的signal方法时(必须先调用锁的lock()方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS的阻塞队列里面,然后激活这个线程。

    ​ 需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象。需要由AQS的子类来提供newCondition函数。

    ​ 当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为

    Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转换为Node节点插入到条

    件变量对应的条件队列里面。这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。当另外一个线程调用条件变量的signal()或者signa!All()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: