您的位置:首页 > 数据库 > Redis

基于Redis的分布式锁实现

2016-05-16 18:41 615 查看

Redis锁

Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。

SETNX命令(SET if Not eXists)语法:

SETNX key value

若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。

安全性:保证互斥,在任何时候,只有一个客户端可以持有锁

无死锁:即使当前持有锁的客户端崩溃或者从集群中被分开了,其它客户端最终总是能够获得锁。

容错性:只要大部分的 Redis 节点在线,那么客户端就能够获取和释放锁。

加锁:

if (conn.setnx(“lock","1").equals(1L)) {
return true;
}
return false;


解锁

conn.del(“lock”);


存在问题: 客户端崩溃或其他原因导致解锁失败时,其他客户端将再也无法获取到锁,造成死锁。

SET resource_name my_random_value NX PX 30000


命令中的NX表示如果Key不存在就添加,存在则直接返回。

PX表示以毫秒为单位设置key的过期时间,这里是30000ms。

设置过期时间是防止获得锁的客户端突然崩溃掉或其他异常情况,导致Redis中的对象锁一直无法释放,造成死锁。Key的值需要在所有请求锁服务的客户端中,确保是个唯一值。 这是为了保证拿到锁的客户端能安全释放锁,防止这个锁对象被其他客户端删除。

分布式锁

举例: A客户端拿到对象锁,但在因为一些原因被阻塞导致无法及时释放锁。 因为过期时间已到,Redis中的锁对象被删除。

B客户端请求获取锁成功。

C客户端请求获取锁成功。

这时B、C都拿到了锁,因此分布式锁失效。

加锁:

public static String lock(String lockName, long lockTimeout) {
String identifier = UUID.randomUUID().toString();
if (redis.setnx(lockName, identifier).equals(1L)) {
redis.pexpire(lockName, lockTimeout);
} else if (redis.ttl(lockName).equals(-1L)){
redis.pexpire(lockName, lockTimeout);
}
return null;
}


解锁:

public static void unlock(String lockName, String identifier) {
if (identifier.equals(redis.get(lockName))) {
redis.del(lockName);
}
}


加锁 参数含义: KEYS[1] :需要加锁的key,这里需要是字符串类型。

ARGV[1] :锁的超时时间,防止死锁

ARGV[2] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId

// 检查是否key已经被占用,如果没有则设置超时时间和唯一标识,初始化value=1

if (redis.call('exists', KEYS[1]) == 0)
then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end; // 如果锁重入,需要判断锁的key field 都一致情况下 value 加一
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);//锁重入重新设置超时时间
return nil;
end; // 返回剩余的过期时间
return redis.call('pttl', KEYS[1]);


解锁

参数:

– KEYS[1] :需要加锁的key,这里需要是字符串类型。

– KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个

channelName:“redisson_lock__channel__{” + getName() + “}”

– ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。

– ARGV[2] :锁的超时时间,防止死锁 – ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId

// 如果key已经不存在,说明已经被解锁,直接发布(publish)redis消息

if (redis.call('exists', KEYS[1]) == 0)
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end; // key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then return nil;
end;


// 将value减1

local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); // 如果counter>0说明锁在重入,不能删除key
if (counter > 0)
then redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else // 删除key并且publish 解锁消息
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;


高性能 加锁、解锁部分使用LUA封装,确保原子性。

使用PUB/SUB消息机制,减少申请锁时的等待时间 安全 锁带有超时时间、锁的标识唯一,防止死锁 锁设计为可重入,避免死锁

可重入锁指的是在一个线程中可以多次获取同一把锁,如Java中的 ReentrantLock和 synchronized关键字

public void get() {
lock.lock();
set();
lock.unlock();
}

public void set() {
lock.lock();
lock.unlock();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: