自己动手写带有事务支持的分布式Key-Value存储系统——读写锁
2014-03-17 22:57
519 查看
为了实现高并发的锁机制事务,需要使用读写锁进行并发控制。Java类库本身提供了读写锁,但是其灵活性对于这个系统的需求是不够的。本系统的读写锁需要实现在单个线程内部能够任意的加写锁,加读锁,并且不会阻塞,锁的释放可以单次单次进行,也可以一次完成(多次加锁,一次释放);但是多个线程之间仍然符合读写锁的互斥性。
为了实现这样的需求,我的设计如下
|--------------| 读线程队列 |-----------------------|-----------------------|--------------------|
| |--------------------------------------->|线程+读锁定次数|线程+读锁定次数|。。。。。。 |
| |
| 读写锁 | 写线程信息 |---------------------------|
|-------------|----------------------------------------->| 写线程+写锁定次数|
|---------------------------|
每一个读写锁,保存一个读线程队列,队列中保存加了读锁的线程,以及每个线程加读锁的次数;并且保存一个写线程信息(线程+加写锁的次数)。每一次加读锁(共享锁)的时候,必须是写锁未被锁定,或者写锁的锁定者是当前线程,否则就等待,加读锁就是将当前线程对应的读锁定次数加1;加写锁(排他锁)的时候,已加写锁线程必须为空,或者为当前线程并且读线程队列为空,或者只有当前线程,否则等待,加锁时就是将写锁定次数加一。其余的操作读者可以自行推理。如果有问题,请留言相互讨论。
实现的核心代码,所有代码详见:https://github.com/FoOTOo/DoDo/tree/master/java/DoDo/DoDo/storage/src/main/java/org/footoo/storage/transaction/lock:
org.footoo.storage.transaction.lock.RandomSync
封装后
org.footoo.storage.transaction.lock.RandomSyncRWLock
org.footoo.storage.transaction.lock.Lock
org.footoo.storage.transaction.lock.RWLock
为了实现这样的需求,我的设计如下
|--------------| 读线程队列 |-----------------------|-----------------------|--------------------|
| |--------------------------------------->|线程+读锁定次数|线程+读锁定次数|。。。。。。 |
| |
| 读写锁 | 写线程信息 |---------------------------|
|-------------|----------------------------------------->| 写线程+写锁定次数|
|---------------------------|
每一个读写锁,保存一个读线程队列,队列中保存加了读锁的线程,以及每个线程加读锁的次数;并且保存一个写线程信息(线程+加写锁的次数)。每一次加读锁(共享锁)的时候,必须是写锁未被锁定,或者写锁的锁定者是当前线程,否则就等待,加读锁就是将当前线程对应的读锁定次数加1;加写锁(排他锁)的时候,已加写锁线程必须为空,或者为当前线程并且读线程队列为空,或者只有当前线程,否则等待,加锁时就是将写锁定次数加一。其余的操作读者可以自行推理。如果有问题,请留言相互讨论。
实现的核心代码,所有代码详见:https://github.com/FoOTOo/DoDo/tree/master/java/DoDo/DoDo/storage/src/main/java/org/footoo/storage/transaction/lock:
org.footoo.storage.transaction.lock.RandomSync
/** * Alipay.com Inc. * Copyright (c) 2004-2014 All Rights Reserved. */ package org.footoo.storage.transaction.lock; import java.util.LinkedList; import java.util.List; import org.footoo.common.log.Logger; import org.footoo.common.log.LoggerFactory; /** * <h1>同步函数,这哥们必须是线程安全的</h1> * <b>可以加读锁的前提</b> * <ul> * <li>写锁没有锁定,即加写锁的线程为null,或者为当前线程 * </ul> * <br> * <b>可以加写锁的前提</b> * <ul> * <li>写锁没有锁定,即加写锁的线程为null,或者为当前线程 * <li>读锁没有锁定,即加读锁的线程为null,或者是当前线程 * </ul> * * <br> * <h2>问题<h2> * <ul> * <li>随机的唤醒方式会出现写线程饥饿的现象,即写线程锁定了, * 可以锁定大量的读线程,导致读线程得不到运行, * 详见org.footoo.storage.SyncTest2。 * 这样就需要一种公平的唤醒方式来提高公平性 * </ul> * * @author jeff * @version $Id: Sync.java, v 0.1 2014年3月17日 下午4:48:48 jeff Exp $ */ public class RandomSync { /** 锁对象 */ private final Object lock = new Object(); /** 获得了共享锁的线程 */ private final List<ThreadRefCount> sharedThreads = new LinkedList<>(); /** 获得独占锁的线程 */ private ThreadRefCount writeThreads = null; /** 日志工具 */ private static final Logger logger = LoggerFactory.getLogger(RandomSync.class); /** 是否已经被释放 */ private boolean released = false; /** * 共享锁的方式锁 * * @throws LockReleasedException */ public void lockShared() throws LockReleasedException { synchronized (lock) { waitUtilCanRead(); //进行加读锁,其实就是增加相应的线程的锁计数 ThreadRefCount refCount = getOrCreateRef(sharedThreads, Thread.currentThread()); refCount.incRefCount(); } } /** * 排他的方式锁 * * @throws LockReleasedException */ public void lockWrite() throws LockReleasedException { synchronized (lock) { waitUtilCanWrite(); if (writeThreads == null) { writeThreads = new ThreadRefCount(Thread.currentThread(), 0); } //进行写锁的加锁,其实就是增加写线程的加锁计数 writeThreads.incRefCount(); } } /** * 解锁共享锁 * * @throws LockReleasedException * @throws IllegalStateLockException */ public void unlockShared() throws LockReleasedException, IllegalStateLockException { synchronized (lock) { testSyncOk(); //获取线程对应的应用计数情况 ThreadRefCount refCount = getRefCount(sharedThreads, Thread.currentThread()); //状态不合法 if (refCount == null || refCount.getRefCount() <= 0) { throw new IllegalStateLockException("无法解锁未处于锁定状态的读锁"); } //减去一个引用计数 refCount.decRefCount(); if (refCount.getRefCount() == 0) { sharedThreads.remove(refCount); //唤醒等待的写线程 if (sharedThreads.isEmpty() && writeThreads == null) { lock.notifyAll(); } } } } /** * 解锁写锁 * * @throws LockReleasedException * @throws IllegalStateLockException */ public void unlockWrite() throws LockReleasedException, IllegalStateLockException { synchronized (lock) { testSyncOk(); if (!writeThreads.getThread().equals(Thread.currentThread())) { throw new IllegalStateLockException("无法解锁未被锁定的写锁"); } writeThreads.decRefCount(); if (writeThreads.getRefCount() == 0) { //写锁彻底被解开 writeThreads = null; //唤醒所有等待的线程 lock.notifyAll(); } } } /** * 尝试timeoutms长度时间的加锁 * * @param timeoutms * @return * @throws LockReleasedException */ public boolean tryLockShared(int timeoutms) throws LockReleasedException { synchronized (lock) { testSyncOk(); if (!isWriteLockUnlock()) { if (timeoutms > 0) { try { lock.wait(timeoutms); } catch (InterruptedException e) { logger.warn(e, "try lock shared时等待异常"); } } else { return false; } testSyncOk(); } //醒来重新测试 if (isWriteLockUnlock()) { ThreadRefCount refCount = getOrCreateRef(sharedThreads, Thread.currentThread()); refCount.incRefCount(); return true; } else { return false; } } } /** * 不阻塞的方式加锁 * * @return * @throws LockReleasedException */ public boolean tryLockShared() throws LockReleasedException { return tryLockShared(0); } /** * 尝试在timeoutms毫秒内加写锁 * * @param timeoutms * @return * @throws LockReleasedException */ public boolean tryLockWrite(int timeoutms) throws LockReleasedException { synchronized (lock) { if (!canAddWriteLock()) { if (timeoutms > 0) { try { lock.wait(timeoutms); } catch (InterruptedException e) { logger.error(e, "try lock write 等待失败"); } } else { return false; } testSyncOk(); } //测试状态 if (!canAddWriteLock()) { return false; } else { if (writeThreads == null) { writeThreads = new ThreadRefCount(Thread.currentThread(), 0); } writeThreads.incRefCount(); return true; } } } /** * 非阻塞方式加锁 * * @return * @throws LockReleasedException */ public boolean tryLockWrite() throws LockReleasedException { return tryLockWrite(0); } /** * 释放当前线程的所有的资源 * * @throws LockReleasedException */ public void releaseSelf() throws LockReleasedException { synchronized (lock) { testSyncOk(); //释放读锁资源 ThreadRefCount refCount = getRefCount(sharedThreads, Thread.currentThread()); if (refCount != null) { sharedThreads.remove(refCount); } //释放写锁 if (writeThreads != null && writeThreads.getThread() == Thread.currentThread()) { writeThreads = null; lock.notifyAll(); } } } /** * 释放掉这个锁 * * @throws LockReleasedException */ public void release() throws LockReleasedException { synchronized (lock) { //不允许重复释放 testSyncOk(); released = true; } } /** * 等待当前线程可以加写锁 * * @throws LockReleasedException */ private void waitUtilCanRead() throws LockReleasedException { synchronized (lock) { testSyncOk(); while (!isWriteLockUnlock()) { try { lock.wait(); } catch (InterruptedException e) { logger.warn(e, "线程等待失败"); } //重新测试锁状态 testSyncOk(); } } } /** * 等待直到可以加写锁 * * @throws LockReleasedException */ private void waitUtilCanWrite() throws LockReleasedException { synchronized (lock) { testSyncOk(); //等待直到可以加写锁 while (!canAddWriteLock()) { try { lock.wait(); } catch (InterruptedException e) { logger.warn(e, "线程等待失败"); } //重新测试锁状态 testSyncOk(); } } } /** * 是否可以加读锁 * * @return */ private boolean canAddWriteLock() { return isWriteLockUnlock() && isReadLockUnlock(); } /** * 是否读锁还没有锁定 * * @return */ private boolean isReadLockUnlock() { return sharedThreads.isEmpty() || (sharedThreads.size() == 1 && sharedThreads.get(0).getThread() == Thread .currentThread()); } /** * 写锁是否未锁 * * @return */ private boolean isWriteLockUnlock() { synchronized (lock) { return (writeThreads == null) || (writeThreads.getThread().equals(Thread.currentThread())); } } /** * 获取线程therad对应的refCount * * @param refCounts * @param thread * @return */ private ThreadRefCount getRefCount(List<ThreadRefCount> refCounts, Thread thread) { ThreadRefCount refCount = new ThreadRefCount(Thread.currentThread(), 0); int index = 0; synchronized (lock) { if ((index = refCounts.indexOf(refCount)) == -1) { refCount = null; } else { refCount = refCounts.get(index); } return refCount; } } /** * 获取或者创建线程的加锁信息 * * @param refCounts * @param thread * @return */ private ThreadRefCount getOrCreateRef(List<ThreadRefCount> refCounts, Thread thread) { ThreadRefCount refCount = new ThreadRefCount(Thread.currentThread(), 0); int index = 0; synchronized (lock) { if ((index = refCounts.indexOf(refCount)) == -1) { refCounts.add(refCount); } else { refCount = refCounts.get(index); } return refCount; } } /** * 确保锁没有被释放 * * @throws LockReleasedException */ public void testSyncOk() throws LockReleasedException { synchronized (lock) { if (released) { throw new LockReleasedException(); } } } /** * 计数线程锁定的次数 * * @author jeff * @version $Id: Sync.java, v 0.1 2014年3月17日 下午5:45:26 jeff Exp $ */ private class ThreadRefCount { /** 锁定的线程 */ private Thread thread; /** 锁定的次数 */ private int refCount = 0; public ThreadRefCount(Thread thread, int refCount) { this.thread = thread; this.refCount = refCount; } /** * 增加引用计数 */ public void incRefCount() { refCount++; } /** * 减少引用计数 */ public void decRefCount() { refCount--; } /** * Getter method for property <tt>thread</tt>. * * @return property value of thread */ public Thread getThread() { return thread; } /** * Setter method for property <tt>thread</tt>. * * @param thread value to be assigned to property thread */ @SuppressWarnings("unused") public void setThread(Thread thread) { this.thread = thread; } /** * Getter method for property <tt>refCount</tt>. * * @return property value of refCount */ public int getRefCount() { return refCount; } /** * Setter method for property <tt>refCount</tt>. * * @param refCount value to be assigned to property refCount */ @SuppressWarnings("unused") public void setRefCount(int refCount) { this.refCount = refCount; } /** * @see java.lang.Object#hashCode() */ public int hashCode() { return thread.hashCode(); } /** * @see java.lang.Object#equals(java.lang.Object) */ public boolean equals(Object other) { if (this == other) { return true; } if (!(other instanceof ThreadRefCount)) { return false; } return thread.equals(((ThreadRefCount) other).getThread()); } } }
封装后
org.footoo.storage.transaction.lock.RandomSyncRWLock
/** * Alipay.com Inc. * Copyright (c) 2004-2014 All Rights Reserved. */ package org.footoo.storage.transaction.lock; import org.footoo.common.log.Logger; import org.footoo.common.log.LoggerFactory; /** * 读写锁,实际只是封装了RandomSync * * @author jeff * @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:09:45 jeff Exp $ */ public class RandomSyncRWLock implements RWLock { /** 实际的同步工具 */ private RandomSync sync = new RandomSync(); /** 读锁 */ private Lock readLockInner = new ReadLock(); /** 写锁 */ private Lock writeLockInner = new WriteLock(); /** 日志工具 */ private static final Logger logger = LoggerFactory.getLogger(RandomSyncRWLock.class); /** * @see org.footoo.storage.transaction.lock.RWLock#readLock() */ @Override public Lock readLock() { return readLockInner; } /** * @see org.footoo.storage.transaction.lock.RWLock#writeLock() */ @Override public Lock writeLock() { return writeLockInner; } /** * @see org.footoo.storage.transaction.lock.RWLock#releaseSelf() */ @Override public void releaseSelf() { try { sync.releaseSelf(); } catch (LockReleasedException e) { logger.warn(e, "释放当前线程的读写锁发生异常"); } } /** * @see org.footoo.storage.transaction.lock.RWLock#release() */ @Override public void release() { try { sync.release(); } catch (LockReleasedException e) { logger.error(e, "释放读写锁发生异常"); } } /** * 读锁 * * @author jeff * @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:12:42 jeff Exp $ */ private class ReadLock implements Lock { @Override public void lock() { try { sync.lockShared(); } catch (LockReleasedException e) { logger.warn(e); } } @Override public void unlock() { try { sync.unlockShared(); } catch (LockReleasedException | IllegalStateLockException e) { logger.warn(e); } } @Override public boolean tryLock() { try { return sync.tryLockShared(); } catch (LockReleasedException e) { logger.warn(e); return false; } } @Override public boolean tryLock(int ms) { try { return sync.tryLockShared(ms); } catch (LockReleasedException e) { logger.warn(e); return false; } } } /** * 写锁 * * @author jeff * @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:13:40 jeff Exp $ */ private class WriteLock implements Lock { @Override public void lock() { try { sync.lockWrite(); } catch (LockReleasedException e) { logger.warn(e); } } @Override public void unlock() { try { sync.unlockWrite(); } catch (LockReleasedException | IllegalStateLockException e) { logger.warn(e); } } @Override public boolean tryLock() { try { return sync.tryLockWrite(); } catch (LockReleasedException e) { logger.warn(e); return false; } } @Override public boolean tryLock(int ms) { try { return sync.tryLockWrite(ms); } catch (LockReleasedException e) { logger.warn(e); return false; } } } }所有接口
org.footoo.storage.transaction.lock.Lock
/** * Alipay.com Inc. * Copyright (c) 2004-2014 All Rights Reserved. */ package org.footoo.storage.transaction.lock; /** * 锁 * * @author jeff * @version $Id: Rock.java, v 0.1 2014年3月14日 下午7:10:46 jeff Exp $ */ public interface Lock { /** * 锁定 */ public void lock(); /** * 解锁 */ public void unlock(); /** * 尝试加锁,立即返回 * * @return */ public boolean tryLock(); /** * 尝试加锁,ms毫秒后超时 * * @param ms * @return */ public boolean tryLock(int ms); }
org.footoo.storage.transaction.lock.RWLock
/** * Alipay.com Inc. * Copyright (c) 2004-2014 All Rights Reserved. */ package org.footoo.storage.transaction.lock; /** * 读写锁 * <ul> * <li>能够实现基本的读写锁功能 * <li>同线程的读写锁不会阻塞 * </ul> * * @author jeff * @version $Id: RWRock.java, v 0.1 2014年3月14日 下午7:10:05 jeff Exp $ */ public interface RWLock { /** * 获得读锁 * * @return */ public Lock readLock(); /** * 写锁 * * @return */ public Lock writeLock(); /** * 释放当前线程的所有读写锁 */ public void releaseSelf(); /** * 释放锁 */ public void release(); }
相关文章推荐
- 自己动手写带有事务支持的分布式Key-Value存储系统——锁管理器
- 自己动手写带有事务支持的分布式Key-Value存储系统
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统[转摘]
- (转)利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统
- 利用TokyoTyrant构建兼容Memcached协议、支持故障转移、高并发的分布式Key-value持久存储系统(转)
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统[转载]
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统
- 利用Tokyo Tyrant构建兼容Memcached协议、支持故障转移、高并发的分布式key-value持久存储系统[原创]
- Explorer : 发布一个key-value存储系统,带有客户端和服务器端
- 分布式key-value存储系统的比较列表
- Explorer : 发布一个key-value存储系统,带有客户端和服务器端
- 分布式key-value存储系统的比较列表
- 动手改造Ibatis,使其支持文件系统存储数据列 预览
- 动手改造Ibatis,使其支持文件系统存储数据列 之 源码下载编译和SqlMapConfig解析