您的位置:首页 > 数据库 > Redis

Redis分布式锁

2016-05-04 11:14 567 查看
该分布式锁仅仅供思路参考,实际上该方法用在生产环境用还是有一定的问题,请使用原生的redisson分布式锁来解决生产环境中分布式应用锁的问题。https://github.com/redisson/

以下仅供参考思路,不要应用在生产环境中

参考原文来自博客园

本文是参考其他博客的思路和版本使用java代码改写的java版本。直接上代码,然后再进行分析。

php版本

/**
* 加锁
* @param  [type]  $name           锁的标识名
* @param  integer $timeout        循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待
* @param  integer $expire         当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放
* @param  integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)
* @return [type]                  [description]
*/
public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {
if ($name == null) return false;

//取得当前时间
$now = time();
//获取锁失败时的等待超时时刻
$timeoutAt = $now + $timeout;
//锁的最大生存时刻
$expireAt = $now + $expire;

$redisKey = "Lock:{$name}";
while (true) {
//将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放
$result = $this->redisString->setnx($redisKey, $expireAt);

if ($result != false) {
//设置key的失效时间
$this->redisString->expire($redisKey, $expireAt);
//将锁标志放到lockedNames数组里
$this->lockedNames[$name] = $expireAt;
return true;
}

//以秒为单位,返回给定key的剩余生存时间
$ttl = $this->redisString->ttl($redisKey);

//ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)
//如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用
//这时可以直接设置expire并把锁纳为己用
if ($ttl < 0) {
$this->redisString->set($redisKey, $expireAt);
$this->lockedNames[$name] = $expireAt;
return true;
}

/*****循环请求锁部分*****/
//如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出
if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

//隔 $waitIntervalUs 后继续 请求
usleep($waitIntervalUs);

}
return false;
}


上面是php版本的加锁方式,最开始我直接将上面版本使用java改写。 然后部署双机进行并发测试。我将分布式锁加到一个发礼包码的功能上面,该功能大概需求是这样的:用户点击页面按钮,服务器收到用户请求返回一个礼包码给用户,每个用户只能抢一次,每个礼包码都是唯一的不用重复。测试工具使用的是Jmeter, 每次启动100个线程,然后进行模拟用户点击请求发礼包码接口。线程请求完毕之后,看数据发现会有少量用户重复了。于是我添加了跟踪日志,在用户每次获取锁返回的地方,打印当前的参数与锁是如何获取到的。不难发现获取锁总共有两种途径,一种是setnx时返回true,另外就是

ttl < 0 的时候,ttl<0 说明锁key当前为设置过期时间,那么下面我们就来分析下锁key未设置过期时间的两种情况。

1. 线程A 在调用setnx之后系统宕机,未来得及调用expire方法,B线程进入的时候发现ttl<0;

2. 线程A 在调用setnx之后,未来的及调用expire方法,B线程等待的一个循环中刚好调用ttl方法,此时A线程的expire方法还未调用,B线程发现ttl小于0,因此此时两个线程同时获取了锁。

因为 setnx与与expire 是非原子性操作,所以很难保证在ttl<0的时候(即在setnx和expire之间其它线程进入ttl < 0 的流程。因此在ttl<0 的时候不能直接调用set方法来占用锁,而是应该先取出锁的值进行超时判断,由于最开始的时候setnx的值保存的是锁的有效时间,此时可以拿出锁的值进行判断是否超时,如果超时则说明该锁未设置过期时间是因为系统宕机引起的(锁的超时时间一般以秒为单位,系统宕机再启动肯定已经超时)。 如果锁的值未超时则是其他线程在调用setnx时未来的及调用expire方法。最后Java版本如下。

/**
* 加锁
*
* @param name
*            锁的表示名称
* @param timeout
*            循环获取锁的等待超时时间,在此时间内会一致尝试
* @param expire
*            当前锁的最大生存时间(秒)
* @param waitIntervalUs
*/
public boolean lock(String name, Integer timeout, Integer expire,
Long waitIntervalUs) {
if (StringUtils.isBlank(name)) {
return false;
}
Long newTime = System.currentTimeMillis();
Long timeoutAt = newTime + timeout * 1000L;

Long expireAt = newTime + expire * 1000L;

String lockKey = getRedisKey(name);
int count = 0;
while (true) {
if(logger.isDebugEnabled()) {
logger.debug(
"count: {}",count++);
}
Long result = redisClientTemplate.setnx(lockKey,
String.valueOf(expireAt));
if (result != null && result == 1) {
redisClientTemplate.expire(lockKey, expire);
lockedNames.put(name, expireAt);
if(logger.isDebugEnabled()) {
logger.debug(
"setnx success, get lockKey:{}, newTime:{}, expireAt:{}",
lockKey, newTime, expireAt);
}
return true;
}
Long tll = redisClientTemplate.ttl(lockKey);
// ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)
// 如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用
// 这时可以直接设置expire并把锁纳为己用
if (tll < 0) {
String expireAtTime = redisClientTemplate.get(lockKey);
// 判断是否存在值,如果存在根据该值判断是刚刚加入进去的新值,还是系统挂掉导致expire未更新的旧值
if (StringUtils.isNotBlank(expireAtTime)) {
Long expireAtSime = Long.valueOf(expireAtTime);
// 判断set进去的值是否小于当前系统时间,如果小于的话说明是上面在调用expire之前系统crash掉,已经过期
if (expireAtSime < System.currentTimeMillis()) {
redisClientTemplate.setex(lockKey, expire,
String.valueOf(expireAt));
lockedNames.put(name, expireAt);
if(logger.isDebugEnabled()) {
logger.debug(
"tll < 0 and expireAtSime < system time, get lockKey:{}, newTime:{}, expireAt:{},old expireat:{}",
lockKey, newTime, expireAt, expireAtSime);
}
return true;
}
}else {
result = redisClientTemplate.setnx(lockKey,
String.valueOf(expireAt));
if (result != null && result == 1) {
redisClientTemplate.expire(lockKey, expire);
lockedNames.put(name, expireAt);
if(logger.isDebugEnabled()) {
logger.debug(
"tll < 0 and setnx success, get lockKey:{}, newTime:{}, expireAt:{}",
lockKey, newTime, expireAt);
}
return true;
}
}
}
/***** 循环请求锁部分 *****/
// 如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出
if (timeout <= 0 || timeoutAt < System.currentTimeMillis()) {
if(logger.isDebugEnabled()) {
logger.debug("请求超时退出");
}
break;
}

// 隔 $waitIntervalUs 后继续 请求
try {
Thread.sleep(waitIntervalUs);
} catch (InterruptedException e) {
logger.error("thread sleep error: ", e);
}
}
return false;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: