Redis实现分布式锁的正确姿势
1. 前言
分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于Zookeeper的分布式锁。本篇详细介绍如何正确地实现Redis分布式锁。
2. 分布式锁的实现要点
为了确保分布式锁可用,我们至少需要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动释放锁,也能保证后续其他客户端能加锁。
- 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
3. Redis实现分布式锁的错误姿势
在讲解使用redis实现分布式锁的正确姿势前,先来看下错误的实现方式。
3.1 加锁的错误姿势
首先,为了保证互斥性和不会发生死锁两个条件,所以我们在加锁操作的时候,需要使用
SETNX指令来保证互斥性——只有一个客户端能够持有锁。为了保证不会发生死锁,需要给锁加一个过期时间,这样就可以保证即使持有锁的客户端崩溃了也不会一直不释放锁。
为了保证这两个条件,有些错误的实现会用如下代码来实现加锁操作:
/** * 实现加锁的错误姿势 * @param jedis * @param lockKey * @param requestId * @param expireTime */ public static void wrongGetLock1(Jedis jedis, String lockKey, String requestId, int expireTime) { Long result = jedis.setnx(lockKey, requestId); if (result == 1) { // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁 jedis.expire(lockKey, expireTime); } }
可能有些人没看出来以上实现加锁操作的错误原因。setnx和expire是两条指令,不具备原子性,如果程序在执行完setnx后突然崩溃,导致没有设置锁的过期时间,从而就导致死锁了,其他客户端会一直阻塞。
针对该代码的正确姿势应该保证setnx和expire原子性。
实现加锁操作的错误姿势2。具体实现如下代码所示:
/** * 实现加锁的错误姿势2 * @param jedis * @param lockKey * @param expireTime * @return */ public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) { long expires = System.currentTimeMillis() + expireTime; String expiresStr = String.valueOf(expires); // 如果当前锁不存在,返回加锁成功 if (jedis.setnx(lockKey, expiresStr) == 1) { return true; } // 如果锁存在,获取锁的过期时间 String currentValueStr = jedis.get(lockKey); if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间 String oldValueStr = jedis.getSet(lockKey, expiresStr); if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁 return true; } } // 其他情况,一律返回加锁失败 return false; }
实现思路:使用jedis.setnx()命令实现加锁,其中key是锁,value是锁的过期时间。执行过程:1. 通过setnx()方法尝试加锁,如果当前锁不存在,返回加锁成功。2. 如果锁已经存在则获取锁的过期时间,和当前时间比较,如果锁已经过期,则设置新的过期时间,返回加锁成功。
这一种错误示例就比较难以发现问题,而且实现也比较复杂。这段代码的问题在于:
- 由于客户端自己生成过期时间,所以需要强制要求分布式环境下所有客户端的时间必须同步。
- 当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,虽然最终只有一个客户端加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖。
- 不具备加锁和解锁必须是同一个客户端的特性,即任何客户端都可以解锁。
解决上面这段代码的方式就是为每个客户端加锁添加一个唯一标识,以确保加锁和解锁操作是来自同一个客户端
3.2 解锁的错误姿势
分布式锁的实现无非就两个方法,一个加锁,一个解锁。下面来看下解锁的错误姿势。
错误姿势1:
/** * 解锁错误姿势1 * @param jedis * @param lockKey */ public static void wrongReleaseLock1(Jedis jedis, String lockKey) { jedis.del(lockKey); }
上面实现是最简单直接的解锁方式,这种不先判断拥有者而直接解锁的方式,会导致任何客户端都可以随时解锁,即使这把锁不是它上锁的。
错误姿势2:
/** * 解锁错误姿势2 * @param jedis * @param lockKey * @param requestId */ public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) { // 判断加锁与解锁是不是同一个客户端 if (requestId.equals(jedis.get(lockKey))) { // 若在此时,这把锁突然不是这个客户端的,则会误解锁 jedis.del(lockKey); }
既然错误姿势1中没有判断锁的拥有者,那姿势2中判断了拥有者,那错误原因又在哪里呢?答案还是在原子性上。因为判断和删除不是一个原子操作。在并发的时候很可能发生了解除了别的客户端加的锁。具体场景有:客户端A加锁,一段时间之后A进行解锁操作时,在执行jedis.del()之前,锁突然过期了,此事客户端B尝试加锁成功,然后A再执行del方法,则A将B的锁给解除了。从而不满足加锁和解锁必须是同一个客户端的性质。
解决思路就是需要保证GET和DEL操作原子性
4. Redis实现分布式锁的正确姿势
加锁操作的正确姿势为:
- 使用setnx命令保持互斥性;
- 需要设置锁的过期时间,避免死锁;
- setnx和过期时间需要保持原子性,避免在设置setnx成功之后再设置过期时间客户端崩溃导致死锁;
- 加锁的value值为一个唯一标识。可以采用UUID作为唯一标识。加锁成功后需要把唯一标识返回给客户端用来进行解锁。
解锁的正确姿势:
- 需要那加锁成功的唯一标识进行解锁,从而保证加锁和解锁的是用一个客户端;
- 解锁需要比较唯一标识是否相等,相等则再执行删除。这两个操作可以采用Lua脚本方式使两个命令的原子性。
Redis分布式锁实现的正确姿势的实现代码:
public interface DistributedLock { /** * 获取锁 * @return 锁标识 */ String acquire(); /** * 释放锁 * @param indentifier * @return */ boolean release(String indentifier); } /** * @Description * @created 2019/1/1 20:32 */ @Slf4j public class RedisDistributedLock implements DistributedLock{ private static final String LOCK_SUCCESS = "OK"; private static final Long RELEASE_SUCCESS = 1L; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; /** * redis 客户端 */ private Jedis jedis; /** * 分布式锁的键值 */ private String lockKey; /** * 锁的超时时间 10s */ int expireTime = 10 * 1000; /** * 锁等待,防止线程饥饿 */ int acquireTimeout = 1 * 1000; /** * 获取指定键值的锁 * @param jedis jedis Redis客户端 * @param lockKey 锁的键值 */ public RedisDistributedLock(Jedis jedis, String lockKey) { this.jedis = jedis; this.lockKey = lockKey; } /** * 获取指定键值的锁,同时设置获取锁超时时间 * @param jedis jedis Redis客户端 * @param lockKey 锁的键值 * @param acquireTimeout 获取锁超时时间 */ public RedisDistributedLock(Jedis jedis,String lockKey, int acquireTimeout) { this.jedis = jedis; this.lockKey = lockKey; this.acquireTimeout = acquireTimeout; } /** * 获取指定键值的锁,同时设置获取锁超时时间和锁过期时间 * @param jedis jedis Redis客户端 * @param lockKey 锁的键值 * @param acquireTimeout 获取锁超时时间 * @param expireTime 锁失效时间 */ public RedisDistributedLock(Jedis jedis, String lockKey, int acquireTimeout, int expireTime) { this.jedis = jedis; this.lockKey = lockKey; this.acquireTimeout = acquireTimeout; this.expireTime = expireTime; } @Override public String acquire() { try { // 获取锁的超时时间,超过这个时间则放弃获取锁 long end = System.currentTimeMillis() + acquireTimeout; // 随机生成一个value String requireToken = UUID.randomUUID().toString(); while (System.currentTimeMillis() < end) { String result = jedis.set(lockKey, requireToken, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return requireToken; } try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (Exception e) { log.error("acquire lock due to error", e); } return null; } @Override public boolean release(String identify) { if(identify == null){ return false; } String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = new Object(); try { result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(identify)); if (RELEASE_SUCCESS.equals(result)) { log.info("release lock success, requestToken:{}", identify); return true; } }catch (Exception e){ log.error("release lock due to error",e); }finally { if(jedis != null){ jedis.close(); } } log.info("release lock failed, requestToken:{}, result:{}", identify, result); return false; } } 下面就以秒杀库存数量为场景,测试下上面实现的分布式锁的效果。具体测试代码如下: public class RedisDistributedLockTest { static int n = 500; public static void secskill() { System.out.println(--n); } public static void main(String[] args) { Runnable runnable = () -> { RedisDistributedLock lock = null; String unLockIdentify = null; try { Jedis conn = new Jedis("127.0.0.1",6379); lock = new RedisDistributedLock(conn, "test1"); unLockIdentify = lock.acquire(); System.out.println(Thread.currentThread().getName() + "正在运行"); secskill(); } finally { if (lock != null) { lock.release(unLockIdentify); } } }; for (int i = 0; i < 10; i++) { Thread t = new Thread(runnable); t.start(); } } }
5. 总结
这样是不是已经完美使用Redis实现了分布式锁呢?答案是并没有结束。上面的实现代码只是针对单机的Redis没问题。但是现实生产中大部分是集群或者是主备的,上面的实现在集群或者主备情况下会有相应的问题,可以尝试使用Redisson实现分布式锁,这是Redis官方提供的Java组件。参考
- 使用Redis实现分布式锁的正确姿势
- 【分布式缓存系列】Redis实现分布式锁的正确姿势
- 【分布式缓存系列】Redis实现分布式锁的正确姿势
- Redis 分布式锁的正确实现方式( Java 版 )
- Redis 如何正确实现分布式锁
- Redis 分布式锁的正确实现方式
- java语言描述Redis分布式锁的正确实现方式
- Redis分布式锁的正确实现方式(Java版)
- Redis分布式锁的正确实现方式(Java版)
- 【分布式缓存系列】集群环境下Redis分布式锁的正确姿势
- 谈谈Redis分布式锁的正确实现方法
- Redis分布式锁的正确实现方式
- 一篇文章带你解读Redis分布式锁的发展史和正确实现方式
- Redis分布式锁的正确实现方式(Java版)
- 这才是实现分布式锁的正确姿势!
- Redis分布式锁的正确实现方式
- Redis 分布式锁的正确实现方式( Java 版 )
- Redis 分布式锁的正确实现方式( Java 版 )
- redis分布式锁的正确实现方式
- Redis 分布式锁的正确实现方式Java