您的位置:首页 > 编程语言 > Java开发

Java并发包中锁原理剖析(二)

2019-01-17 17:05 411 查看

1). 独占锁ReentrantLock

​ ReentrantLock 拥有内部抽象类Sync ,Sync 直接继承自AQS,它的子类NonfairSync 和FairSync 分别实现了非公平锁和公平锁策略。AQS 的state状态值代表的是线程获取锁的重入次数,默认情况下,state为0 表示当前锁没有被任何线程持有,当有线程第一次获取锁时会使用CAS设置state的值为1 成功后会将锁持有者设置为此线程。在此线程没有释放锁情况下,再次获取锁,会将state的值加一,这就是重入次数。在此线程释放锁时,会使用CAS将state值减一,如果减一后state的值为0,则此线程释放该锁。

  • public void lock():

    public void lock() {
    sync.lock();
    }

    调用了sync#lock方法 ,这里我们看NonfairSync的实现:

    final void lock() {
    if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
    }

    先尝试将state值设置为1,成功了将锁的持有者设为当前线程(一般为第一次获取锁时),否则调用AQS#acquire(1)(重入锁时)。 我们在上一篇已经介绍了AQS#acquire(int arg) ,其改变state的实现在tryAcquire(int arg)方法。我们看看nonfairTryAcquire(int arg)方法:

    final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取当前state值
    int c = getState();
    // 如果为 0 ,则将state 设为 acquires ,再将锁的持有者设为当前线程
    if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    // 不为 0 并且当前线程是锁的持有者, 将当前的state加上acquires并设置为新的state
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

    具体的逻辑都写了注解。再看FairSync 的实现:

    protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    if (!hasQueuedPredecessors() &&
    compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

    主要差别再hasQueuedPredecessors方法:

    public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
    ((s = h.next) == null || s.thread != Thread.currentThread());
    }

    在如上代码中,如果当前线程节点有前驱节点则返回住时,否则如果当前AQS队列为空或者当前线程节是AQS的第一个节点则返回false。其中如果ht则说明当前队列为空,直接返回false;如果h!=t并且snull则说明有一个元素将要作为AQS的第一个节点入队列(回顾上一节的内容,enq函数的第一个元素入队列是两步操作:首先创建一个哨兵头节点,然后将第一个元素插入哨兵节点后面,那么返回true,如果h!=t并且s!=null和s.thread!= Thread.cunentThread()则说明队列里面的第一个元素不是当前线程,那么返回true。这里可以知道,公平锁的机制是,阻塞队列前如果有线程在等待,则阻塞当前线程,类似于排队。

  • public void lockInterruptibly() throws InterruptedException

    public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
    }
    public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (!tryAcquire(arg))
    doAcquireInterruptibly(arg);
    }

    lockInterruptibly方法会响应中断,当前线程在调用该方法时,如果其他线程调用了当前线程的interrupt方法,则会抛出InterruptedException异常。

  • public boolean tryLock() :

    public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
    }

    直接调用的非公平策略

  • public boolean tryLock(long timeout, TimeUnit unit)

    public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    return tryAcquire(arg) ||
    doAcquireNanos(arg, nanosTimeout);
    }

    在指定时间没有获取锁,即返回。

  • public void unlock()

    public void unlock() {
    sync.release(1);
    }
    protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
    }

    当前线程不是锁的持有者抛出异常, 当前state值减去releases,如果为0时,锁释放。设置state的新值。

2).读写锁ReentrantReadWriteLock

​ 解决线程安全问题使用ReentrantLock就可以,但是ReentrantLock是独占锁,某时只有一个线程可以获取该锁,而实际中会有写少读多的场景,显然ReentrantLock满足不了这个需求,所以ReentrantReadWriteLock应运而生。ReentrantReadWriteLock采用读写分离的策略,允许多个线程可以同时获取读锁。

​ 读写锁的内部维护了一个ReadLock和一个WriteLock,它们依赖Sync实现具体功能。而Sync继承自AQS,并且也提供了公平和非公平的实现。下面只介绍非公平的读写锁实现。我们知道AQS中只维护了一个state状态,而ReentrantReadWriteLock则需要维护读状态和写状态,一个state怎么表示写和读两种状态呢?ReentrantReadWriteLock巧妙地使用state的高16位表示读状态,也就是获取到读锁的次数;使用低16位表示获取到写锁的线程的可重入次数。

1). WriteLock

  • public void lock()

    public void lock() {
    sync.acquire(1);
    }

    主要看:

    protected final boolean tryAcquire(int acquires) {
    /*
    * Walkthrough:
    * 1. If read count nonzero or write count nonzero
    *    and owner is a different thread, fail.
    * 2. If count would saturate, fail. (This can only
    *    happen if count is already nonzero.)
    * 3. Otherwise, this thread is eligible for lock if
    *    it is either a reentrant acquire or
    *    queue policy allows it. If so, update state
    *    and set owner.
    */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
    // (Note: if c != 0 and w == 0 then shared count != 0)
    if (w == 0 || current != getExclusiveOwnerThread())
    return false;
    if (w + exclusiveCount(acquires) > MAX_COUNT)
    throw new Error("Maximum lock count exceeded");
    // Reentrant acquire
    setState(c + acquires);
    return true;
    }
    if (writerShouldBlock() ||
    !compareAndSetState(c, c + acquires))
    return false;
    setExclusiveOwnerThread(current);
    return true;
    }

    writerShouldBlock 方法在公平锁和非公平锁实现不同:

    NonfairSync :
final boolean writerShouldBlock() {
return false; // writers can always barge
}
  • FairSync:

    final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
    }
  • public boolean tryLock( )

    public boolean tryLock( ) {
    return sync.tryWriteLock();
    }
    final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
    int w = exclusiveCount(c);
    if (w == 0 || current != getExclusiveOwnerThread())
    return false;
    if (w == MAX_COUNT)
    throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
    return false;
    setExclusiveOwnerThread(current);
    return true;
    }

    上面的逻辑与tryAcquire()方法类似。

  • public boolean tryLock(long timeout, TimeUnit unit)

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    return tryAcquire(arg) ||
    doAcquireNanos(arg, nanosTimeout);
    }
  • public void unlock()

    public void unlock() {
    sync.release(1);
    }
    protected final boolean tryRelease(int releases) {
    // 锁持有者是否为当前线程 , 不是则抛出异常
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //   state 为 0 则释放锁
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
    setExclusiveOwnerThread(null);
    // 设置新的state
    setState(nextc);
    return free;
    }
  • 2).ReadLock

    • public void lock()

      protected final int tryAcquireShared(int unused) {
      
      Thread current = Thread.currentThread();
      int c = getState();
      // 判断是否有线程是否持有写锁, 如果有判断是否是当前线程。如果有而不是当前线程返回-1;
      if (exclusiveCount(c) != 0 &&
      getExclusiveOwnerThread() != current)
      return -1;
      // 获取锁共享的数量
      int r = sharedCount(c);
      if (!readerShouldBlock() &&
      // 是否超过最大
      r < MAX_COUNT &&
      compareAndSetState(c, c + SHARED_UNIT)) {
      // 如果 分享数量为 0 ,则当前线程是第一个持有读锁的线程
      if (r == 0) {
      firstReader = current;
      firstReaderHoldCount = 1;
      } else if (firstReader == current) {
      firstReaderHoldCount++;
      } else {
      // 锁共享数量加一
      HoldCounter rh = cachedHoldCounter;
      if (rh == null || rh.tid != getThreadId(current))
      cachedHoldCounter = rh = readHolds.get();
      else if (rh.count == 0)
      readHolds.set(rh);
      rh.count++;
      }
      return 1;
      }
      // 类似  但是是自旋获取
      return fullTryAcquireShared(current);
      }
    • public void unlock()

      public void unlock() {
      sync.releaseShared(1);
      }
      protected final boolean tryReleaseShared(int unused) {
      Thread current = Thread.currentThread();
      // 是否为第一个持有者
      if (firstReader == current) {
      // assert firstReaderHoldCount > 0;
      if (firstReaderHoldCount == 1)
      firstReader = null;
      else
      firstReaderHoldCount--;
      } else {
      HoldCounter rh = cachedHoldCounter;
      if (rh == null || rh.tid != getThreadId(current))
      rh = readHolds.get();
      int count = rh.count;
      if (count <= 1) {
      readHolds.remove();
      if (count <= 0)
      throw unmatchedUnlockException();
      }
      --rh.count;
      }
      for (;;) {
      int c = getState();
      int nextc = c - SHARED_UNIT;
      if (compareAndSetState(c, nextc))
      // Releasing the read lock has no effect on readers,
      // but it may allow waiting writers to proceed if
      // both read and write locks are now free.
      return nextc == 0;
      }
      }

      如以上代码所示,在无限循环里面,首先获取当前AQS状态值并将其保存到变量c,然后变量c被减去一个读计数单位后使用CAS操作更新AQS状态值,如果更新成功则查看当前AQS状态值是否为0,为0则说明当前己经没有读线程占用读锁,则tryReleaseShared返回true。然后会调用doReleaseShared方法释放一个由于获取写锁而被阻塞的线程,如果当前AQS状态值不为0,则说明当前还有其他线程持有了读锁,所以trγReleaseShared返回false。如果trγReleaseShared中的CAS更新AQS状态值失败,则自旋重试直到成功。

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: