分布式学习笔记-基于lua实现Redis分布式锁
redis分布式锁的底层原理
我们先通过一张图来对Redis分布式锁有一个只管的感受
- 加锁的机制
我们先来看上面的流程图,现在客户端1需要加锁,通过jedis访问redis资源,然后发送lua脚本到redis上,通过执行lua脚本的返回值来确定是否加锁成功,如果没有加锁成功那么客户端需要通过循环不断尝试加锁直到加锁成功为止。
2.为什么选择lua脚本,而不是jedis提供的操作
因为lua脚本里面封装了一大推复杂的业务逻辑,通过lua脚本发送到redis中可以保证这些复杂的业务逻辑的执行时原子性的,jedis提供的操作并不能保证原子性操作。
// 加锁的lua脚本 String lockScript = "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //可重入锁 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //返回key剩余的超时时间 "return redis.call('pttl', KEYS[1]);";
KEYS[1]:表示你加锁的key值,就是你打算存在在redis的key。ARGV[1])表示超时的时间,这里是毫秒。ARGV[2]代表客户端的value值一个UUID值。我们使用hset来存储大概的格式是12345555555555 1(重入锁重入一次的时候存储的数据是12345555555555 2)
3. redis锁的互斥机制
客户端1已经加锁key=“redis_key”,此时客户端2来也来尝试加锁,那么redis中的lua脚本将会执行如下的逻辑。
第一个if会判断key="redis_key"是否存在发现这个锁已经存在,接着执行第二个if程序,判断redis_key中存储的value值是否相同,如果存在的话就是重入锁,现在这里不是。因此客户端2会得到redis_key的剩余超时时间,此时客户端2会进入while循环中,不断尝试执行上锁的过程,直到加锁成功。
大概的流程就是这样。还是比较简单。
4.为什么需要后台线程给key动态续命?
我们在设置key的时候都会给他一个超时的时间,既然已经给key设置了超时时间,为什么还需要给key续命呢?这不是自相矛盾的操作吗?其实不然,设置key的超时时间是防止key永远存在在redis集群中,造成其他的客户端无法加锁。续命是为了应对额外的场景,可能也是大对数人在使用redis分布式锁都会遇到的问题。加锁客户端1加锁key=“redis_test”,设置超时时间2000毫秒,客户端1拿到锁之后正常执行逻辑,但是整个逻辑执行完到释放锁需要5000毫秒,因为设置key="redis_test"的超时时间2000毫秒,显然客户端2在2001毫秒可以拿到锁,因为超时锁被自动删除了。但是,客户端1的逻辑并没有执行完,现在客户端2又拿到锁开始执行自己的逻辑,这与我们初衷是违背的,这种场景下动态续命就很重要了。
5代码
我们使用jedis来操作redis.RedisLock初始化的时候需要调用init(),启动后台续命线程。
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
package com.panda.practice.project.lock; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author rewnei2 * @version v0.1 2019/1/12 11:15 */ public class RedisLock { //存储redis缓存的key和value,expireTime,用于动态的给key,value续命 private Map<String, LockNode> cacheMap = new ConcurrentHashMap<String, LockNode>(); //判断初始化函数是否已经被执行 private boolean inited = false; private Object lock = new Object(); //重入锁释放一个资源 private static final Long REENTRY_LOCK_RELEASE = 0L; //分布式锁释放成功 private static final Long LOCK_RELEASE_SUCCESS = 1L; private static JedisPool jedisPool;//非切片连接池 /** * 初始化redis资源池 */ static { // 池基本配置 JedisPoolConfig config = new JedisPoolConfig(); // config.setMaxTotal(5); //config.setMaxIdle(5); config.setMaxWaitMillis(1000 * 100); config.setTestOnBorrow(true); config.setTestOnReturn(true); jedisPool = new JedisPool(config, "127.0.0.1", 6379, 2000); } /** * 获取分布式锁 * * @param key 分布式锁的key * @param value 对应的value * @param expireTime 超时时间 * @return */ public void tryLock(String key, String value, long expireTime) { Jedis jedis = null; try { String lockScript = "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //可重入锁 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //返回key剩余的超时时间 20000 "return redis.call('pttl', KEYS[1]);"; List<String> keys = new ArrayList<String>(); keys.add(key); List<String> args = new ArrayList<String>(); args.add(expireTime + ""); args.add(value); jedis = jedisPool.getResource(); boolean flag = true; int counter = 1; while (flag) { Object obj = jedis.eval(lockScript, keys, args); if (obj != null) {//当前锁被其他程序占用 try { //线程休眠200毫秒 TimeUnit.MILLISECONDS.sleep(200); System.out.println(String.format("线程%s休眠,key=%s,value=%s没有竞争到锁", Thread.currentThread().getName(), key, value) + "counter" + counter++); continue; } catch (InterruptedException e) { e.printStackTrace(); } } flag = false; System.out.println(String.format("线程%s拿到了redis锁key=%s,value=%s", Thread.currentThread().getName(), key, value)); LockNode lockNode = new LockNode(); lockNode.setValue(value); lockNode.setExpireTime(expireTime + ""); cacheMap.put(key, lockNode); } } finally { //释放资源到连接池 returnResource(jedis); } } public void returnResource(Jedis jedis) { if (jedis != null) { jedis.close(); } } /** * 释放锁,重入锁的时候减去1,当hset中value等于0的时候删除key * * @param key * @param value * @param expireTime */ public void releaseLock(String key, String value, long expireTime) { Jedis jedis = null; try { String releaseLock = "if (redis.call('exists', KEYS[1]) == 0) then " + //"redis.call('publish', KEYS[2], ARGV[1]); " + "return nil; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + //"redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;"; List<String> keys = new ArrayList<String>(); keys.add(key); List<String> args = new ArrayList<String>(); args.add(expireTime + ""); args.add(value); jedis = jedisPool.getResource(); Object result = jedis.eval(releaseLock, keys, args); if (result == null) { System.out.println("当前的值不存在"); return; } Long resultNum = (Long) result; if (result != null && LOCK_RELEASE_SUCCESS.equals(resultNum)) { cacheMap.remove(key); System.out.println(String.format("redis分布式锁释放成功key=%s,value=%s", key, value)); } if (result != null && REENTRY_LOCK_RELEASE.equals(resultNum)) { System.out.println(String.format("redis分布式可重入锁释放一个资源成功key=%s,value=%s", key, value)); } } finally { returnResource(jedis); } } public void getHsetVales(String key) { Jedis jedis = jedisPool.getResource(); Map<String, String> map = jedis.hgetAll(key); if (map == null || map.size() == 0) { System.out.println(String.format("当前key=%s没有缓存数据", key)); } else { System.out.println(jedis.hgetAll(key)); } } public void destroy() { jedisPool.destroy(); } /** * 启动一个后台线程用于动态给key、value续命,知道释放锁的时候删除这个key-value数据 */ public void init() { synchronized (lock) { if (!inited) { new Thread(new Runnable() { public void run() { while (true) { try { Thread.sleep(1733); } catch (InterruptedException e) { e.printStackTrace(); } if (cacheMap != null && cacheMap.size() > 0) { Jedis jedis = jedisPool.getResource(); try { extendLockTime(jedis); } finally { returnResource(jedis); } } } } }, "RedisLock init()").start(); inited = true; } } } private void extendLockTime(Jedis jedis) { String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;"; for (Map.Entry<String, LockNode> entry : cacheMap.entrySet()) { String key = entry.getKey(); String value = entry.getValue().getValue(); String expireTime = entry.getValue().getExpireTime(); List<String> keys = new ArrayList<String>(); keys.add(key); List<String> args = new ArrayList<String>(); args.add(expireTime); args.add(value); Object obj = jedis.eval(script, keys, args); if (obj != null) { Long result = (Long) obj; if (LOCK_RELEASE_SUCCESS.equals(result)) { System.out.println(String.format("给redis中分布式锁key=%s,value=%s续命时间%s毫秒", key, value, expireTime)); } else { System.out.println(String.format("error给redis中分布式锁key=%s,value=%s续命时间%s毫秒", key, value, expireTime)); } } } //returnResource(jedis); } static class LockNode implements Serializable { //存储redis的value值 private String value; //key的超时时间,毫秒 private String expireTime; public String getValue() { return value; } public void setValue(String value) { this.value = value; } public String getExpireTime() { return expireTime; } public void setExpireTime(String expireTime) { this.expireTime = expireTime; } @Override public String toString() { return "LockNode{" + "value='" + value + '\'' + ", expireTime='" + expireTime + '\'' + '}'; } } }
5最后
现在使用一个Map来存储key,通过 便利Map的方式来动态续命,其实这并不是一个很好的办法,还有很多优化的地方。由于本人水平有限,本文难免有不足的地方,请各位批评指正。
- JAVA学习笔记(十)基于LinkedList实现栈和队列
- 基于JAVA的水泥三维图像特征提取系统设计与实现 之一 JAVA 3D学习笔记
- 学习Java框架的笔记(Spring AOP)简介、动态代理、基于代理类的AOP实现、AspectJ开发
- 【电信增值业务学习笔记】9基于智能网的增值业务实现技术和应用
- Lua学习笔记5:类及继承的实现
- 深度学习笔记-第4章-《深度学习入门——基于Python的理论与实现》
- 《设计模式:基于C#的工程化实现及扩展》学习笔记--目录
- lua学习笔记 2 android调用Lua。Lua脚本中实现添加Button,并为Button增加Listener
- [算法学习笔记]基于最大堆实现最大优先队列
- 学习笔记TF035:实现基于LSTM语言模型
- 《21个项目玩转深度学习--基于tensorflow的实践详解》代码实现和笔记(一)
- Django框架学习笔记(10.基于ORM实现简单的用户登录)
- (学习笔记二)——基于opencv人脸检测原理及实现
- WinSock网络编程学习笔记(九):基于UDP实现DayTime协议
- List的基本操作实战与基于模式匹配的List排序算法实现之Scala学习笔记-23
- 一个无聊男人的疯狂《数据结构与算法分析-C++描述》学习笔记 用C++/lua/python/bash的四重实现(3) 最大子序列和问题
- [原创]java WEB学习笔记108:Spring学习---基于配置文件的形式实现AOP
- 使用Nginx+Lua实现的WAF - 学习笔记
- cocos-lua学习笔记(五)cocos2d-Lua类的实现
- 基于Redis Lua脚本实现的分布式锁 | 日拱一卒