您的位置:首页 > 产品设计 > UI/UE

自己动手写带有事务支持的分布式Key-Value存储系统——读写锁

2014-03-17 22:57 519 查看
为了实现高并发的锁机制事务,需要使用读写锁进行并发控制。Java类库本身提供了读写锁,但是其灵活性对于这个系统的需求是不够的。本系统的读写锁需要实现在单个线程内部能够任意的加写锁,加读锁,并且不会阻塞,锁的释放可以单次单次进行,也可以一次完成(多次加锁,一次释放);但是多个线程之间仍然符合读写锁的互斥性。

为了实现这样的需求,我的设计如下

|--------------| 读线程队列 |-----------------------|-----------------------|--------------------|

| |--------------------------------------->|线程+读锁定次数|线程+读锁定次数|。。。。。。 |

| |

| 读写锁 | 写线程信息 |---------------------------|

|-------------|----------------------------------------->| 写线程+写锁定次数|

|---------------------------|

每一个读写锁,保存一个读线程队列,队列中保存加了读锁的线程,以及每个线程加读锁的次数;并且保存一个写线程信息(线程+加写锁的次数)。每一次加读锁(共享锁)的时候,必须是写锁未被锁定,或者写锁的锁定者是当前线程,否则就等待,加读锁就是将当前线程对应的读锁定次数加1;加写锁(排他锁)的时候,已加写锁线程必须为空,或者为当前线程并且读线程队列为空,或者只有当前线程,否则等待,加锁时就是将写锁定次数加一。其余的操作读者可以自行推理。如果有问题,请留言相互讨论。

实现的核心代码,所有代码详见:https://github.com/FoOTOo/DoDo/tree/master/java/DoDo/DoDo/storage/src/main/java/org/footoo/storage/transaction/lock

org.footoo.storage.transaction.lock.RandomSync

/**
* Alipay.com Inc.
* Copyright (c) 2004-2014 All Rights Reserved.
*/
package org.footoo.storage.transaction.lock;

import java.util.LinkedList;
import java.util.List;

import org.footoo.common.log.Logger;
import org.footoo.common.log.LoggerFactory;

/**
* <h1>同步函数,这哥们必须是线程安全的</h1>
* <b>可以加读锁的前提</b>
* <ul>
*  <li>写锁没有锁定,即加写锁的线程为null,或者为当前线程
* </ul>
* <br>
* <b>可以加写锁的前提</b>
* <ul>
*  <li>写锁没有锁定,即加写锁的线程为null,或者为当前线程
*  <li>读锁没有锁定,即加读锁的线程为null,或者是当前线程
* </ul>
*
* <br>
* <h2>问题<h2>
* <ul>
*  <li>随机的唤醒方式会出现写线程饥饿的现象,即写线程锁定了,
*  可以锁定大量的读线程,导致读线程得不到运行,
*  详见org.footoo.storage.SyncTest2。
*  这样就需要一种公平的唤醒方式来提高公平性
* </ul>
*
* @author jeff
* @version $Id: Sync.java, v 0.1 2014年3月17日 下午4:48:48 jeff Exp $
*/
public class RandomSync {
/** 锁对象 */
private final Object               lock          = new Object();
/** 获得了共享锁的线程 */
private final List<ThreadRefCount> sharedThreads = new LinkedList<>();
/** 获得独占锁的线程 */
private ThreadRefCount             writeThreads  = null;
/** 日志工具 */
private static final Logger        logger        = LoggerFactory.getLogger(RandomSync.class);
/** 是否已经被释放 */
private boolean                    released      = false;

/**
* 共享锁的方式锁
*
* @throws LockReleasedException
*/
public void lockShared() throws LockReleasedException {
synchronized (lock) {
waitUtilCanRead();
//进行加读锁,其实就是增加相应的线程的锁计数
ThreadRefCount refCount = getOrCreateRef(sharedThreads, Thread.currentThread());
refCount.incRefCount();
}
}

/**
* 排他的方式锁
*
* @throws LockReleasedException
*/
public void lockWrite() throws LockReleasedException {
synchronized (lock) {
waitUtilCanWrite();
if (writeThreads == null) {
writeThreads = new ThreadRefCount(Thread.currentThread(), 0);
}
//进行写锁的加锁,其实就是增加写线程的加锁计数
writeThreads.incRefCount();
}
}

/**
* 解锁共享锁
*
* @throws LockReleasedException
* @throws IllegalStateLockException
*/
public void unlockShared() throws LockReleasedException, IllegalStateLockException {
synchronized (lock) {
testSyncOk();
//获取线程对应的应用计数情况
ThreadRefCount refCount = getRefCount(sharedThreads, Thread.currentThread());
//状态不合法
if (refCount == null || refCount.getRefCount() <= 0) {
throw new IllegalStateLockException("无法解锁未处于锁定状态的读锁");
}
//减去一个引用计数
refCount.decRefCount();
if (refCount.getRefCount() == 0) {
sharedThreads.remove(refCount);
//唤醒等待的写线程
if (sharedThreads.isEmpty() && writeThreads == null) {
lock.notifyAll();
}
}
}
}

/**
* 解锁写锁
*
* @throws LockReleasedException
* @throws IllegalStateLockException
*/
public void unlockWrite() throws LockReleasedException, IllegalStateLockException {
synchronized (lock) {
testSyncOk();

if (!writeThreads.getThread().equals(Thread.currentThread())) {
throw new IllegalStateLockException("无法解锁未被锁定的写锁");
}

writeThreads.decRefCount();
if (writeThreads.getRefCount() == 0) {
//写锁彻底被解开
writeThreads = null;
//唤醒所有等待的线程
lock.notifyAll();
}
}
}

/**
* 尝试timeoutms长度时间的加锁
*
* @param timeoutms
* @return
* @throws LockReleasedException
*/
public boolean tryLockShared(int timeoutms) throws LockReleasedException {
synchronized (lock) {
testSyncOk();
if (!isWriteLockUnlock()) {
if (timeoutms > 0) {
try {
lock.wait(timeoutms);
} catch (InterruptedException e) {
logger.warn(e, "try lock shared时等待异常");
}
} else {
return false;
}
testSyncOk();
}
//醒来重新测试
if (isWriteLockUnlock()) {
ThreadRefCount refCount = getOrCreateRef(sharedThreads, Thread.currentThread());
refCount.incRefCount();
return true;
} else {
return false;
}
}
}

/**
* 不阻塞的方式加锁
*
* @return
* @throws LockReleasedException
*/
public boolean tryLockShared() throws LockReleasedException {
return tryLockShared(0);
}

/**
* 尝试在timeoutms毫秒内加写锁
*
* @param timeoutms
* @return
* @throws LockReleasedException
*/
public boolean tryLockWrite(int timeoutms) throws LockReleasedException {
synchronized (lock) {
if (!canAddWriteLock()) {
if (timeoutms > 0) {
try {
lock.wait(timeoutms);
} catch (InterruptedException e) {
logger.error(e, "try lock write 等待失败");
}
} else {
return false;
}
testSyncOk();
}
//测试状态
if (!canAddWriteLock()) {
return false;
} else {
if (writeThreads == null) {
writeThreads = new ThreadRefCount(Thread.currentThread(), 0);
}
writeThreads.incRefCount();
return true;
}
}
}

/**
* 非阻塞方式加锁
*
* @return
* @throws LockReleasedException
*/
public boolean tryLockWrite() throws LockReleasedException {
return tryLockWrite(0);
}

/**
* 释放当前线程的所有的资源
*
* @throws LockReleasedException
*/
public void releaseSelf() throws LockReleasedException {
synchronized (lock) {
testSyncOk();

//释放读锁资源
ThreadRefCount refCount = getRefCount(sharedThreads, Thread.currentThread());
if (refCount != null) {
sharedThreads.remove(refCount);
}

//释放写锁
if (writeThreads != null && writeThreads.getThread() == Thread.currentThread()) {
writeThreads = null;
lock.notifyAll();
}
}
}

/**
* 释放掉这个锁
*
* @throws LockReleasedException
*/
public void release() throws LockReleasedException {
synchronized (lock) {
//不允许重复释放
testSyncOk();
released = true;
}
}

/**
* 等待当前线程可以加写锁
*
* @throws LockReleasedException
*/
private void waitUtilCanRead() throws LockReleasedException {
synchronized (lock) {
testSyncOk();
while (!isWriteLockUnlock()) {
try {
lock.wait();
} catch (InterruptedException e) {
logger.warn(e, "线程等待失败");
}
//重新测试锁状态
testSyncOk();
}
}
}

/**
* 等待直到可以加写锁
*
* @throws LockReleasedException
*/
private void waitUtilCanWrite() throws LockReleasedException {
synchronized (lock) {
testSyncOk();
//等待直到可以加写锁
while (!canAddWriteLock()) {
try {
lock.wait();
} catch (InterruptedException e) {
logger.warn(e, "线程等待失败");
}
//重新测试锁状态
testSyncOk();
}
}
}

/**
* 是否可以加读锁
*
* @return
*/
private boolean canAddWriteLock() {
return isWriteLockUnlock() && isReadLockUnlock();
}

/**
* 是否读锁还没有锁定
*
* @return
*/
private boolean isReadLockUnlock() {
return sharedThreads.isEmpty()
|| (sharedThreads.size() == 1 && sharedThreads.get(0).getThread() == Thread
.currentThread());
}

/**
* 写锁是否未锁
*
* @return
*/
private boolean isWriteLockUnlock() {
synchronized (lock) {
return (writeThreads == null)
|| (writeThreads.getThread().equals(Thread.currentThread()));
}
}

/**
* 获取线程therad对应的refCount
*
* @param refCounts
* @param thread
* @return
*/
private ThreadRefCount getRefCount(List<ThreadRefCount> refCounts, Thread thread) {
ThreadRefCount refCount = new ThreadRefCount(Thread.currentThread(), 0);
int index = 0;
synchronized (lock) {

if ((index = refCounts.indexOf(refCount)) == -1) {
refCount = null;
} else {
refCount = refCounts.get(index);
}
return refCount;
}
}

/**
* 获取或者创建线程的加锁信息
*
* @param refCounts
* @param thread
* @return
*/
private ThreadRefCount getOrCreateRef(List<ThreadRefCount> refCounts, Thread thread) {
ThreadRefCount refCount = new ThreadRefCount(Thread.currentThread(), 0);
int index = 0;
synchronized (lock) {

if ((index = refCounts.indexOf(refCount)) == -1) {
refCounts.add(refCount);
} else {
refCount = refCounts.get(index);
}
return refCount;
}

}

/**
* 确保锁没有被释放
*
* @throws LockReleasedException
*/
public void testSyncOk() throws LockReleasedException {
synchronized (lock) {
if (released) {
throw new LockReleasedException();
}
}
}

/**
* 计数线程锁定的次数
*
* @author jeff
* @version $Id: Sync.java, v 0.1 2014年3月17日 下午5:45:26 jeff Exp $
*/
private class ThreadRefCount {
/** 锁定的线程 */
private Thread thread;
/** 锁定的次数 */
private int    refCount = 0;

public ThreadRefCount(Thread thread, int refCount) {
this.thread = thread;
this.refCount = refCount;
}

/**
* 增加引用计数
*/
public void incRefCount() {
refCount++;
}

/**
* 减少引用计数
*/
public void decRefCount() {
refCount--;
}

/**
* Getter method for property <tt>thread</tt>.
*
* @return property value of thread
*/
public Thread getThread() {
return thread;
}

/**
* Setter method for property <tt>thread</tt>.
*
* @param thread value to be assigned to property thread
*/
@SuppressWarnings("unused")
public void setThread(Thread thread) {
this.thread = thread;
}

/**
* Getter method for property <tt>refCount</tt>.
*
* @return property value of refCount
*/
public int getRefCount() {
return refCount;
}

/**
* Setter method for property <tt>refCount</tt>.
*
* @param refCount value to be assigned to property refCount
*/
@SuppressWarnings("unused")
public void setRefCount(int refCount) {
this.refCount = refCount;
}

/**
* @see java.lang.Object#hashCode()
*/
public int hashCode() {
return thread.hashCode();
}

/**
* @see java.lang.Object#equals(java.lang.Object)
*/
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof ThreadRefCount)) {
return false;
}
return thread.equals(((ThreadRefCount) other).getThread());
}

}

}


封装后

org.footoo.storage.transaction.lock.RandomSyncRWLock

/**
* Alipay.com Inc.
* Copyright (c) 2004-2014 All Rights Reserved.
*/
package org.footoo.storage.transaction.lock;

import org.footoo.common.log.Logger;
import org.footoo.common.log.LoggerFactory;

/**
* 读写锁,实际只是封装了RandomSync
*
* @author jeff
* @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:09:45 jeff Exp $
*/
public class RandomSyncRWLock implements RWLock {
/** 实际的同步工具 */
private RandomSync          sync           = new RandomSync();
/** 读锁 */
private Lock                readLockInner  = new ReadLock();
/** 写锁 */
private Lock                writeLockInner = new WriteLock();
/** 日志工具 */
private static final Logger logger         = LoggerFactory.getLogger(RandomSyncRWLock.class);

/**
* @see org.footoo.storage.transaction.lock.RWLock#readLock()
*/
@Override
public Lock readLock() {
return readLockInner;
}

/**
* @see org.footoo.storage.transaction.lock.RWLock#writeLock()
*/
@Override
public Lock writeLock() {
return writeLockInner;
}

/**
* @see org.footoo.storage.transaction.lock.RWLock#releaseSelf()
*/
@Override
public void releaseSelf() {
try {
sync.releaseSelf();
} catch (LockReleasedException e) {
logger.warn(e, "释放当前线程的读写锁发生异常");

}
}

/**
* @see org.footoo.storage.transaction.lock.RWLock#release()
*/
@Override
public void release() {
try {
sync.release();
} catch (LockReleasedException e) {
logger.error(e, "释放读写锁发生异常");
}
}

/**
* 读锁
*
* @author jeff
* @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:12:42 jeff Exp $
*/
private class ReadLock implements Lock {

@Override
public void lock() {
try {
sync.lockShared();
} catch (LockReleasedException e) {
logger.warn(e);
}
}

@Override
public void unlock() {
try {
sync.unlockShared();
} catch (LockReleasedException | IllegalStateLockException e) {
logger.warn(e);
}
}

@Override
public boolean tryLock() {
try {
return sync.tryLockShared();
} catch (LockReleasedException e) {
logger.warn(e);
return false;
}
}

@Override
public boolean tryLock(int ms) {
try {
return sync.tryLockShared(ms);
} catch (LockReleasedException e) {
logger.warn(e);
return false;
}
}

}

/**
* 写锁
*
* @author jeff
* @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:13:40 jeff Exp $
*/
private class WriteLock implements Lock {

@Override
public void lock() {
try {
sync.lockWrite();
} catch (LockReleasedException e) {
logger.warn(e);
}
}

@Override
public void unlock() {
try {
sync.unlockWrite();
} catch (LockReleasedException | IllegalStateLockException e) {
logger.warn(e);

}
}

@Override
public boolean tryLock() {
try {
return sync.tryLockWrite();
} catch (LockReleasedException e) {
logger.warn(e);
return false;
}
}

@Override
public boolean tryLock(int ms) {
try {
return sync.tryLockWrite(ms);
} catch (LockReleasedException e) {
logger.warn(e);
return false;
}
}

}

}
所有接口

org.footoo.storage.transaction.lock.Lock

/**
* Alipay.com Inc.
* Copyright (c) 2004-2014 All Rights Reserved.
*/
package org.footoo.storage.transaction.lock;

/**
* 锁
*
* @author jeff
* @version $Id: Rock.java, v 0.1 2014年3月14日 下午7:10:46 jeff Exp $
*/
public interface Lock {
/**
* 锁定
*/
public void lock();

/**
* 解锁
*/
public void unlock();

/**
* 尝试加锁,立即返回
*
* @return
*/
public boolean tryLock();

/**
* 尝试加锁,ms毫秒后超时
*
* @param ms
* @return
*/
public boolean tryLock(int ms);
}


org.footoo.storage.transaction.lock.RWLock

/**
* Alipay.com Inc.
* Copyright (c) 2004-2014 All Rights Reserved.
*/
package org.footoo.storage.transaction.lock;

/**
* 读写锁
* <ul>
* <li>能够实现基本的读写锁功能
* <li>同线程的读写锁不会阻塞
* </ul>
*
* @author jeff
* @version $Id: RWRock.java, v 0.1 2014年3月14日 下午7:10:05 jeff Exp $
*/
public interface RWLock {
/**
* 获得读锁
*
* @return
*/
public Lock readLock();

/**
* 写锁
*
* @return
*/
public Lock writeLock();

/**
* 释放当前线程的所有读写锁
*/
public void releaseSelf();

/**
* 释放锁
*/
public void release();

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐