您的位置:首页 > 编程语言 > Java开发

深入浅出Java并发包—读写锁ReentrantReadWriteLock原理分析(一)

2016-04-17 11:39 405 查看
Java里面真正意义的锁并不多,其实真正的实现Lock接口的类就三个,ReentrantLock和ReentrantReadWriteLock的两个内部类(ReentrantReadWriteLock实现了ReadWriteLock接口,并没有实现Lock接口,是其内部类ReadLock和WriteLock实现了Lock的接口),其他都是通过我们前面说的一些工具类实现了线程的阻塞。

 前面锁机制中提到的

ReentrantLock 实现了标准的互斥操作,也就是一次只能有一个线程持有锁,也即所谓独占锁的概念。我们也一直在强调这个特点。显然这个特点在一定程度上面减低了吞吐量,实际上独占锁是一种保守的锁策略,在这种情况下任何“读/读”,“写/读”,“写/写”操作都不能同时发生。
但实际应用场景中我们会经常遇到这样的情况:某些资源需要并发访问,并且大部分时间是用来进行读操作的,写操作比较少,而锁是有一定的开销的,当并发比较大的时候,锁的开销就比较可观了。所以如果可能的话就尽量少用锁,如果非要用锁的话就尝试看能否能实现读写分离,将其改造为读写锁。

ReadWriteLock描述的是:一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。也就是说读写锁使用的场合是一个共享资源被大量读取操作,而只有少量的写操作(修改数据)。我们来看一下他的API文档:

public
interface
ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

代码非常简单,就提供了取读锁和写锁的两个方法。很显然其用法也非常简单,我们来看一个示例。

//资源
class Resource{
    private
int
value;
 
    public
void
setValue(int value) {
       this.value = value;
    }
    public
int
getValue() {
       return
value;
    }

}

ReadWriteLock lock = new ReentrantReadWriteLock();
final Lock readLock = lock.readLock();
final Lock writeLock = lock.writeLock();
final Resource resource =
new Resource();
final Random random =
new Random();
    for(int i=0;i<20;++i){//写线程
       new Thread(){
           public
void
run() {
              writeLock.lock();
              try {
                  resource.setValue(resource.getValue()+1);
                  System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new
Date())+" - "+Thread.currentThread()+"获取了写锁,修正数据为:"+resource.getValue());
                  Thread.sleep(random.nextInt(1000));//随机休眠
              } catch (Exception e) {
                  e.printStackTrace();
              }finally{
                  writeLock.unlock();
              }
           };
       }.start();
    }
    for(int i=0;i<20;++i){//读线程
       new Thread(){
           public
void
run() {
              readLock.lock();
              try {
                  System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new
Date())+" - "+Thread.currentThread()+"获取了读锁,读取的数据为:"+resource.getValue());
                  Thread.sleep(random.nextInt(1000));//随机休眠
              } catch (Exception e) {
                  e.printStackTrace();
              }finally{
                  readLock.unlock();
              }
           };
       }.start();
    }

}

运行结果:

2013-06-16 17:43:25.765 - Thread[Thread-1,5,main]获取了写锁,修正数据为:1
2013-06-16 17:43:25.837 - Thread[Thread-0,5,main]获取了写锁,修正数据为:2
2013-06-16 17:43:25.968 - Thread[Thread-3,5,main]获取了写锁,修正数据为:3
2013-06-16 17:43:26.148 - Thread[Thread-2,5,main]获取了写锁,修正数据为:4
2013-06-16 17:43:27.015 - Thread[Thread-4,5,main]获取了写锁,修正数据为:5
2013-06-16 17:43:27.102 - Thread[Thread-6,5,main]获取了写锁,修正数据为:6
2013-06-16 17:43:27.731 - Thread[Thread-8,5,main]获取了写锁,修正数据为:7
2013-06-16 17:43:28.717 - Thread[Thread-5,5,main]获取了写锁,修正数据为:8
2013-06-16 17:43:29.139 - Thread[Thread-7,5,main]获取了写锁,修正数据为:9
2013-06-16 17:43:29.929 - Thread[Thread-12,5,main]获取了写锁,修正数据为:10
2013-06-16 17:43:30.701 - Thread[Thread-10,5,main]获取了写锁,修正数据为:11
2013-06-16 17:43:31.308 - Thread[Thread-14,5,main]获取了写锁,修正数据为:12
2013-06-16 17:43:32.287 - Thread[Thread-16,5,main]获取了写锁,修正数据为:13
2013-06-16 17:43:32.664 - Thread[Thread-18,5,main]获取了写锁,修正数据为:14
2013-06-16 17:43:33.518 - Thread[Thread-9,5,main]获取了写锁,修正数据为:15
2013-06-16 17:43:33.755 - Thread[Thread-11,5,main]获取了写锁,修正数据为:16
2013-06-16 17:43:33.889 - Thread[Thread-13,5,main]获取了写锁,修正数据为:17
2013-06-16 17:43:34.262 - Thread[Thread-15,5,main]获取了写锁,修正数据为:18
2013-06-16 17:43:34.690 - Thread[Thread-17,5,main]获取了写锁,修正数据为:19
2013-06-16 17:43:35.160 - Thread[Thread-19,5,main]获取了写锁,修正数据为:20
2013-06-16 17:43:35.694 - Thread[Thread-23,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.695 - Thread[Thread-25,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.694 - Thread[Thread-21,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.696 - Thread[Thread-29,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.695 - Thread[Thread-27,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.696 - Thread[Thread-31,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.697 - Thread[Thread-33,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.697 - Thread[Thread-35,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.698 - Thread[Thread-37,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.698 - Thread[Thread-39,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.698 - Thread[Thread-20,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.698 - Thread[Thread-22,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.699 - Thread[Thread-24,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.699 - Thread[Thread-26,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.699 - Thread[Thread-28,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.699 - Thread[Thread-30,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.699 - Thread[Thread-32,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.699 - Thread[Thread-34,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.700 - Thread[Thread-38,5,main]获取了读锁,读取的数据为:20
2013-06-16 17:43:35.700 - Thread[Thread-36,5,main]获取了读锁,读取的数据为:20
结果我就不分析了,很明显写的时候其他写和读都不能操作,而读的时候,写不能操作,但其他线程的读操作都是可以正常执行的。

ReadWriteLock严格区分了读写操作,如果读操作使用了写入锁,那么降低读操作的吞吐量,如果写操作使用了读取锁,那么就可能发生数据错误。

查看API文档,文档中详细记录了该类的一些个特性:

公平性
非公平锁(默认) 这个和独占锁的非公平性一样,由于读线程之间没有锁竞争,所以读操作没有公平性和非公平性,写操作时,由于写操作可能立即获取到锁,所以会推迟一个或多个读操作或者写操作。因此非公平锁的吞吐量要高于公平锁。
公平锁 利用AQS的CLH队列,释放当前保持的锁(读锁或者写锁)时,优先为等待时间最长的那个写线程分配写入锁,当前前提是写线程的等待时间要比所有读线程的等待时间要长。同样一个线程持有写入锁或者有一个写线程已经在等待了,那么试图获取公平锁的(非重入)所有线程(包括读写线程)都将被阻塞,直到最先的写线程释放锁。如果读线程的等待时间比写线程的等待时间还有长,那么一旦上一个写线程释放锁,这一组读线程将获取锁。
重入性
读写锁允许读线程和写线程按照请求锁的顺序重新获取读取锁或者写入锁。当然了只有写线程释放了锁,读线程才能获取重入锁。
写线程获取写入锁后可以再次获取读取锁,但是读线程获取读取锁后却不能获取写入锁。
另外读写锁最多支持65535个递归写入锁和65535个递归读取锁。(源码中使用了一个int类型的数据来记录,应该大于65535这个数据的,但为什么只支持65535呢?我们后面会详细说到)
锁降级
写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性。
锁升级
读取锁是不能直接升级为写入锁的。因为获取一个写入锁需要释放所有读取锁,所以如果有两个读取锁视图获取写入锁而都不释放读取锁时就会发生死锁。
锁获取中断
读取锁和写入锁都支持获取锁期间被中断。这个和独占锁一致。
条件变量
写入锁提供了条件变量(Condition)的支持,这个和独占锁一致,但是读取锁却不允许获取条件变量,将得到一个UnsupportedOperationException异常。
重入数
读取锁和写入锁的数量最大分别只能是65535(包括重入数)。
监测
此类支持一些确定是保持锁还是争用锁的方法。这些方法设计用于监视系统状态,而不是同步控制。
看了这么多的特性,那他究竟是怎么实现的呢?我们一起来剖析下!

首先我们看到这个类似乎有两把锁:readLock/writeLock。但是如果真的是两个锁的话,它们之间又是如何相互影响的呢?

事实上在ReentrantReadWriteLock里锁的实现是靠内部类java.util.concurrent.locks.ReentrantReadWriteLock.Sync完成的。这个类看起来比较眼熟,它是AQS的一个子类,这中类似的结构在CountDownLatch、ReentrantLock、Semaphore里面都存在。同样它也有两种实现:公平锁和非公平锁,也就是java.util.concurrent.locks.ReentrantReadWriteLock.FairSync和java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync。

在ReentrantReadWriteLock里面的锁主体就是一个Sync,也就是上面提到的FairSync或者NonfairSync,所以说实际上只有一个锁,只是在获取读取锁和写入锁的方式上不一样而已。我们来分析下他的源代码:

public
class
ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable

public
static class
ReadLock
implements
Lock, java.io.Serializable

public
static class
WriteLock
implements
Lock, java.io.Serializable

/** Inner class providing readlock */
private
final
ReentrantReadWriteLock.ReadLock
readerLock;
/** Inner class providing writelock */
private
final
ReentrantReadWriteLock.WriteLock
writerLock;
/** Performs all synchronization mechanics */

private
final
Sync sync;

public ReentrantReadWriteLock.WriteLock writeLock() {
return
writerLock; }

public ReentrantReadWriteLock.ReadLock  readLock()  {
return
readerLock; }

public ReentrantReadWriteLock() {
   this(false);
}
public ReentrantReadWriteLock(boolean fair) {
   sync = (fair)? new FairSync() :
new NonfairSync();
   readerLock = new ReadLock(this);
   writerLock = new WriteLock(this);
}

很明显获取锁就是直接返回了对应内部类的读锁和写锁,而这两把锁在读写锁实例创建的时候进行了初始化。那这两把锁又是怎么实现锁操作,又怎样协作的呢?

public
void
lock() {//write Lock
    sync.acquire(1);

}

public
void
lock() {//read Lock
    sync.acquireShared(1);
}

这里和正常的锁操作一样,都是调用同步器的acquire(独占锁)和acquireShared(共享锁)操作的,我们来看一下具体的内部实现。

public
final void

acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();

}

public
final void

acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}

acquire和acquireShared仍然是调用AQS父类的操作,先尝试,然后不行进入队列阻塞等待,具体可参考之前锁机制的部分的描述,我们重点来看下前面第一步尝试部分不同的地方。首先我们来看写锁部分。

  protected
final boolean

tryAcquire(int acquires) {
      Thread current = Thread.currentThread();
      int c = getState();
      int w =
exclusiveCount(c);
      if (c != 0) {
          // (Note: if c != 0 and w == 0 then shared count != 0)
          if (w == 0 || current != getExclusiveOwnerThread())
              return
false
;
          if (w +
exclusiveCount(acquires) > MAX_COUNT)
                throw
new
Error("Maximum lock count exceeded");
      }
      if ((w==0&&writerShouldBlock(current))||!compareAndSetState(c,c+acquires))
           return
false
;
      setExclusiveOwnerThread(current);
      return
true
;
}

这段代码首先取当前锁的个数,然后取写锁的个数w,这段取写锁个数的地方有点奇怪。在AQS部分我们讲到AQS中有一个state字段(int类型,32位)用来描述有多少线程获持有锁。在独占锁的时代这个值通常是0或者1(如果是重入的就是重入的次数),在共享锁的时代就是持有锁的数量。上面我们提到,ReadWriteLock的读、写锁是相关但是又不一致的,所以需要两个数来描述读锁(共享锁)和写锁(独占锁)的数量。显然现在一个state就不够用了。于是在ReentrantReadWrilteLock里面将这个字段一分为二,高位16位表示共享锁的数量,低位16位表示独占锁的数量(或者重入数量)。2^16-1=65535,这就是上节中提到的为什么共享锁和独占锁的数量最大只能是65535的原因了。

static
final int
SHARED_SHIFT   = 16;

static
final int
EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

static
int
exclusiveCount(int c) {
return c & EXCLUSIVE_MASK; }

了解了之后我们再来看这段代码,就好理解多了。既然写锁是低16位,那就取低16位的最大值与当前的c做与运算,高16位和0与运算后是0,那剩下的就是低位运算的值,就全是写锁持有线程的数目了!

1、在取到写锁线程的数目后,首先判断是否已经有线程持有了锁,如果已经有线程持有了锁(c!=0),则看当前写锁线程的数目,如果写线程数(w)为0(那么读线程数就不为0)或者独占锁线程(持有锁的线程)不是当前线程就返回失败,如果写入锁的数量(其实是重入数)大于65535就抛出一个Error异常。

2、如果当且写线程数为0(那么读线程也应该为0,因为上面已经处理c!=0的情况),并且当前线程需要阻塞那么就返回失败;如果通过CAS增加写线程数失败也返回失败。

3、如果c=0,w=0或者c>0,w>0(重入),则设置当前线程或锁的拥有者,返回成功!

整个流程还是比较简单的,但是有人会有疑问了,第2步中,既然写线程为0,读线程也为0,为啥还要判断是否需要阻塞的呢?这一点就涉及到公平锁和不公平锁的实现:

final
boolean
writerShouldBlock(Thread current) {
     // only proceed if queue is empty or current thread at head
     return !isFirst(current);

}

final
boolean
writerShouldBlock(Thread current) {
     return
false
; // writers can always barge
}

公平锁只有头结点才不会阻塞,非公平锁则无阻塞。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: