您的位置:首页 > 数据库 > Redis

分布式学习笔记-基于lua实现Redis分布式锁

qq_20003891 2019-07-10 07:15 190 查看 https://blog.csdn.net/qq_20003

redis分布式锁的底层原理

我们先通过一张图来对Redis分布式锁有一个只管的感受

  1. 加锁的机制
    我们先来看上面的流程图,现在客户端1需要加锁,通过jedis访问redis资源,然后发送lua脚本到redis上,通过执行lua脚本的返回值来确定是否加锁成功,如果没有加锁成功那么客户端需要通过循环不断尝试加锁直到加锁成功为止。
    2.为什么选择lua脚本,而不是jedis提供的操作
    因为lua脚本里面封装了一大推复杂的业务逻辑,通过lua脚本发送到redis中可以保证这些复杂的业务逻辑的执行时原子性的,jedis提供的操作并不能保证原子性操作。
// 加锁的lua脚本
String lockScript = "if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//可重入锁
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//返回key剩余的超时时间
"return redis.call('pttl', KEYS[1]);";

KEYS[1]:表示你加锁的key值,就是你打算存在在redis的key。ARGV[1])表示超时的时间,这里是毫秒。ARGV[2]代表客户端的value值一个UUID值。我们使用hset来存储大概的格式是12345555555555 1(重入锁重入一次的时候存储的数据是12345555555555 2)
3. redis锁的互斥机制
客户端1已经加锁key=“redis_key”,此时客户端2来也来尝试加锁,那么redis中的lua脚本将会执行如下的逻辑。
第一个if会判断key="redis_key"是否存在发现这个锁已经存在,接着执行第二个if程序,判断redis_key中存储的value值是否相同,如果存在的话就是重入锁,现在这里不是。因此客户端2会得到redis_key的剩余超时时间,此时客户端2会进入while循环中,不断尝试执行上锁的过程,直到加锁成功。
大概的流程就是这样。还是比较简单。
4.为什么需要后台线程给key动态续命?
我们在设置key的时候都会给他一个超时的时间,既然已经给key设置了超时时间,为什么还需要给key续命呢?这不是自相矛盾的操作吗?其实不然,设置key的超时时间是防止key永远存在在redis集群中,造成其他的客户端无法加锁。续命是为了应对额外的场景,可能也是大对数人在使用redis分布式锁都会遇到的问题。加锁客户端1加锁key=“redis_test”,设置超时时间2000毫秒,客户端1拿到锁之后正常执行逻辑,但是整个逻辑执行完到释放锁需要5000毫秒,因为设置key="redis_test"的超时时间2000毫秒,显然客户端2在2001毫秒可以拿到锁,因为超时锁被自动删除了。但是,客户端1的逻辑并没有执行完,现在客户端2又拿到锁开始执行自己的逻辑,这与我们初衷是违背的,这种场景下动态续命就很重要了。
5代码
我们使用jedis来操作redis.RedisLock初始化的时候需要调用init(),启动后台续命线程。

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
package com.panda.practice.project.lock;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* @author rewnei2
* @version v0.1 2019/1/12 11:15
*/
public class RedisLock {
//存储redis缓存的key和value,expireTime,用于动态的给key,value续命
private Map<String, LockNode> cacheMap = new ConcurrentHashMap<String, LockNode>();

//判断初始化函数是否已经被执行
private boolean inited = false;

private Object lock = new Object();
//重入锁释放一个资源
private static final Long REENTRY_LOCK_RELEASE = 0L;

//分布式锁释放成功
private static final Long LOCK_RELEASE_SUCCESS = 1L;

private static JedisPool jedisPool;//非切片连接池

/**
* 初始化redis资源池
*/
static {
// 池基本配置
JedisPoolConfig config = new JedisPoolConfig();
//
config.setMaxTotal(5);
//config.setMaxIdle(5);
config.setMaxWaitMillis(1000 * 100);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
jedisPool = new JedisPool(config, "127.0.0.1", 6379, 2000);
}

/**
* 获取分布式锁
*
* @param key        分布式锁的key
* @param value      对应的value
* @param expireTime 超时时间
* @return
*/
public void tryLock(String key, String value, long expireTime) {
Jedis jedis = null;
try {
String lockScript = "if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//可重入锁
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//返回key剩余的超时时间

20000
"return redis.call('pttl', KEYS[1]);";
List<String> keys = new ArrayList<String>();
keys.add(key);
List<String> args = new ArrayList<String>();
args.add(expireTime + "");
args.add(value);
jedis = jedisPool.getResource();
boolean flag = true;
int counter = 1;
while (flag) {
Object obj = jedis.eval(lockScript, keys, args);
if (obj != null) {//当前锁被其他程序占用
try {
//线程休眠200毫秒
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(String.format("线程%s休眠,key=%s,value=%s没有竞争到锁", Thread.currentThread().getName(), key, value) + "counter" + counter++);
continue;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag = false;
System.out.println(String.format("线程%s拿到了redis锁key=%s,value=%s", Thread.currentThread().getName(), key, value));
LockNode lockNode = new LockNode();
lockNode.setValue(value);
lockNode.setExpireTime(expireTime + "");
cacheMap.put(key, lockNode);
}
} finally {
//释放资源到连接池
returnResource(jedis);
}
}

public void returnResource(Jedis jedis) {
if (jedis != null) {
jedis.close();
}
}

/**
* 释放锁,重入锁的时候减去1,当hset中value等于0的时候删除key
*
* @param key
* @param value
* @param expireTime
*/
public void releaseLock(String key, String value, long expireTime) {
Jedis jedis = null;
try {
String releaseLock = "if (redis.call('exists', KEYS[1]) == 0) then " +
//"redis.call('publish', KEYS[2], ARGV[1]); " +
"return nil; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
//"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;";
List<String> keys = new ArrayList<String>();
keys.add(key);
List<String> args = new ArrayList<String>();
args.add(expireTime + "");
args.add(value);
jedis = jedisPool.getResource();
Object result = jedis.eval(releaseLock, keys, args);
if (result == null) {
System.out.println("当前的值不存在");
return;
}
Long resultNum = (Long) result;
if (result != null && LOCK_RELEASE_SUCCESS.equals(resultNum)) {
cacheMap.remove(key);
System.out.println(String.format("redis分布式锁释放成功key=%s,value=%s", key, value));
}
if (result != null && REENTRY_LOCK_RELEASE.equals(resultNum)) {
System.out.println(String.format("redis分布式可重入锁释放一个资源成功key=%s,value=%s", key, value));
}
} finally {
returnResource(jedis);
}

}

public void getHsetVales(String key) {
Jedis jedis = jedisPool.getResource();
Map<String, String> map = jedis.hgetAll(key);
if (map == null || map.size() == 0) {
System.out.println(String.format("当前key=%s没有缓存数据", key));
} else {
System.out.println(jedis.hgetAll(key));
}
}

public void destroy() {
jedisPool.destroy();
}

/**
* 启动一个后台线程用于动态给key、value续命,知道释放锁的时候删除这个key-value数据
*/
public void init() {
synchronized (lock) {
if (!inited) {
new Thread(new Runnable() {
public void run() {
while (true) {
try {
Thread.sleep(1733);
} catch (InterruptedException e) {
e.printStackTrace();
}

if (cacheMap != null && cacheMap.size() > 0) {
Jedis jedis = jedisPool.getResource();
try {
extendLockTime(jedis);
} finally {
returnResource(jedis);
}
}
}
}
}, "RedisLock init()").start();
inited = true;

}
}
}

private void extendLockTime(Jedis jedis) {
String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;";
for (Map.Entry<String, LockNode> entry : cacheMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue().getValue();
String expireTime = entry.getValue().getExpireTime();
List<String> keys = new ArrayList<String>();
keys.add(key);
List<String> args = new ArrayList<String>();
args.add(expireTime);
args.add(value);
Object obj = jedis.eval(script, keys, args);
if (obj != null) {
Long result = (Long) obj;
if (LOCK_RELEASE_SUCCESS.equals(result)) {
System.out.println(String.format("给redis中分布式锁key=%s,value=%s续命时间%s毫秒", key, value, expireTime));
} else {
System.out.println(String.format("error给redis中分布式锁key=%s,value=%s续命时间%s毫秒", key, value, expireTime));
}
}
}
//returnResource(jedis);
}

static class LockNode implements Serializable {
//存储redis的value值
private String value;

//key的超时时间,毫秒
private String expireTime;

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public String getExpireTime() {
return expireTime;
}

public void setExpireTime(String expireTime) {
this.expireTime = expireTime;
}

@Override
public String toString() {
return "LockNode{" +
"value='" + value + '\'' +
", expireTime='" + expireTime + '\'' +
'}';
}
}

}

5最后
现在使用一个Map来存储key,通过 便利Map的方式来动态续命,其实这并不是一个很好的办法,还有很多优化的地方。由于本人水平有限,本文难免有不足的地方,请各位批评指正。

标签: 
相关文章推荐