Java中的锁——ReentrantReadWriteLock(读写锁)
2015-12-21 10:07
597 查看
上一篇里讲的ReentrankLock是一种排他锁,即同一时间只能有一个线程进入。而读写锁在同一时刻允许多个读线程访问,但是在写线程访问时,所有的读线程和其他线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读写锁,使得并发性比一般的排它锁有了很大提升。因为大多数应用场景都是读多于写的,因此在这样的情况下,读写锁可以提高吞吐量。下图描述了关于读写锁的三个特性:公平性、重入性和锁降级。
上图中同步状态表示一个线程已经获取了3次读锁,同时也连续获取2次写锁。想要快速读取这个状态,可以通过位运算。当前状态为S,写状态等于S&0x0000FFFF,读状态等S>>>16。当读状态+1时,等于S+1,写状态加1等于S+(1<<16)。其实现代码如下所示
读锁的获取实际定义在内部同步器Sync的tryAcquire方法中,其源码如下:
读锁的释放实际定义在内部同步器Sync的tryRelease方法中,其源码如下:
读锁的获取定义在内部同步器Sync的tryAcquireShared方法中
读锁的释放在内部同步器Sync的tryReleaseShared方法中
这个释放的过程大致分为两步,第一步减少holdCounter的读锁值,如果已经减为一,则移除holdCounter。第二步就是将state的读状态减一。
1 读写锁的使用
现在有这样一个需求,需要构造一个数据结构作为一个系统中的临时缓存,需要保证线程安全。我们在这里就可以利用读写锁来实现。/** * 利用读写锁实现的一个数据结构 * @author songxu * */ public class ReadWriteCache { //利用hashmap作为底层数据结构 private Map<String, Object> cache=new HashMap<String, Object>(); //构造读写锁 private ReentrantReadWriteLock readwritelock=new ReentrantReadWriteLock(); //读锁 private Lock readLock=readwritelock.readLock(); //写锁 private Lock writeLock=readwritelock.writeLock(); /** * 存入数据 * @param key 键 * @param value 值 */ public void put(String key,Object value) { writeLock.lock(); //锁一定在try块之外 try { cache.put(key, value); } finally { writeLock.unlock(); } } /** * 获取数据 * @param key 键 * @return 值 */ public Object get(String key) { readLock.lock(); try { return cache.get(key); } finally { readLock.unlock(); } } }在上述代码中,put方法在更新或插入数据前必须提前获取写锁,当获取写锁之后,其他线程对于读锁和写锁的获取均被阻塞,只有写锁释放后,其他读操作才能继续。在get方法中,需要获取读锁,而此时其他线程均可访问该方法而不被阻塞。可以说这是一个类似于concurrentHashMap的原型,但它的效率肯定没有concurrentHashMap高,但应该比HashTable要强一些。
2 读写锁的实现原理
2.1 读写状态
读写锁同样利用同步器实现锁的功能,在ReetrantLock中,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态上维护多个读线程和一个写线程的状态。如果想在一个整型变量上维护这样一个状态,那么采用按位分割的方式是一个不错的选择。如下图所示,将一个变量分为两个部分,高16位表示读,低16位表示写上图中同步状态表示一个线程已经获取了3次读锁,同时也连续获取2次写锁。想要快速读取这个状态,可以通过位运算。当前状态为S,写状态等于S&0x0000FFFF,读状态等S>>>16。当读状态+1时,等于S+1,写状态加1等于S+(1<<16)。其实现代码如下所示
/* * Read vs write count extraction constants and functions. * Lock state is logically divided into two unsigned shorts: * The lower one representing the exclusive (writer) lock hold count, * and the upper the shared (reader) hold count. */ static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
2.2 写锁的获取与释放
写锁是一个支持重进入的排它锁。如果当前线程获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。读锁的获取实际定义在内部同步器Sync的tryAcquire方法中,其源码如下:
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState();//获取当前状态 int w = exclusiveCount(c);//获取写的状态 if (c != 0) { //如果存在读锁 或者当前线程不是获取写锁的线程 返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 增加写状态 setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }该方法的逻辑很简单,首先判断读写状态。如果不为0且存在读锁或者已存在的写锁并非当前线程获取到,则写锁不能获取,只能等待其他线程都释放了锁,才能获取。
读锁的释放实际定义在内部同步器Sync的tryRelease方法中,其源码如下:
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }这个释放过程与ReentrantLock的过程基本类似,每次释放均减少些状态,当写状态为0时表示写锁已经被释放。
2.3 读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他线程访问时,读锁总会被成功地获取。如果当前线程已经获取了读锁,则增加读状态,如果获取读锁时写锁已经被其他线程获取,则进入等待状态。读锁的获取定义在内部同步器Sync的tryAcquireShared方法中
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); //如果其他线程已经获取了写锁,则失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c);//获取读锁的数量 if (!readerShouldBlock() && r < MAX_COUNT && //CAS更新状态 因为可能多线程操作 compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }这个tryRelease方法首先需要检查是否其他线程已经获取了写锁,如果被获取则当前线程失败,进入等待状态。否则如果当前线程满足对state的操作条件,就利用CAS设置state+SHARED_UNIT,实际上就是读状态+1。但是需要注意,这个state是全局的,即所有线程获取读锁次数的总和,而为了方便计算本线程的读锁次数以及释放掉锁,需要在ThreadLocal中维护一个变量。这就是HoldCounter。源码下半部分的基本做的事情就是在让HoldCounter加一。
读锁的释放在内部同步器Sync的tryReleaseShared方法中
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //如果当前线程是第一个读者 if (firstReader == current) { // assert firstReaderHoldCount > 0; // 如果第一个读者的读锁已经为1 那么第一个读者置为null if (firstReaderHoldCount == 1) firstReader = null; else //第一个读者读锁数量减一 firstReaderHoldCount--; } else { //否则从缓存中获取当前线程的读锁数量 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count;//读锁数量减一 } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT;// state读锁状态减一 if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; //如果state为0,表示无锁状态,返回true } }
这个释放的过程大致分为两步,第一步减少holdCounter的读锁值,如果已经减为一,则移除holdCounter。第二步就是将state的读状态减一。
相关文章推荐
- Java系列,《Java核心技术 卷1》,chapter 13,集合
- javaweb学习总结(十一)――使用Cookie进行会话管理
- Java中继承的规则
- eclipse 在loading workbench 时闪退
- JDK内置工具使用
- Java-List排序
- 5种解决Java独占写文件的方法
- Java多线程——死锁
- Spring JdbcTemplate框架搭建及其增删改查使用指南
- 基本排序_直接插入排序_Java实现
- Java基础学习总结(35)——Java正则表达式详解
- Java基础学习总结(35)——Java正则表达式详解
- Java基础学习总结(35)——Java正则表达式详解
- Spring CommonsMultipartResolver文件上传的使用
- Java正则表达式详解
- Java线程池中线程的状态简介
- Myeclipse XSD配置XML自动提示
- Java.IO包-File 学习笔记
- Java线程Dump分析工具--jstack
- Java Web系列:Java Web基础