您的位置:首页 > 数据库 > Redis

[redis分布式锁]redisson分布式锁的实现及spring-boot-starter封装

2020-01-13 18:55 495 查看

本文主要介绍redisson中对于可重入锁、读写锁、公平锁的实现,并利用spring AOP封装成基于方法级别的注解使用方式。

关于redisson的介绍及其spring boot starter的封装参考:spring boot redisson starter的封装和使用

redisson是一个非常强大的redis客户端,封装了很多针对分布式场景的工具,很多工具都使用了大量的Lua脚本来实现,其中分布式锁也是如此。需要明确的是: Redis使用单个Lua解释器去运行所有脚本,并且,Redis也保证脚本会以原子性(atomic)的方式执行,即:当某个脚本正在运行的时候,不会有其他脚本或Redis命令被执行 。(摘自:EVAL-Redis命令参考

redisson中的分布式锁实现了jdk中锁的规范,顶层接口主要是RLock和RReadWriteLock,分别继承自jdk中的Lock和ReadWriteLock,redisson中的代码结构如下:

  • RLock接口实现类的代码结构

  • RReadWriteLock接口实现类的代码结构

从类名上可以看出各种锁对应的类

可重入锁 RedissonLock

可重入锁是一种特殊的互斥锁,同一个线程可以重复获取到锁而不会阻塞(相应的,多次获取之后也需要相等次数的释放锁的操作)。不同线程获取同一个锁的时候是互斥的。

获取锁

具体实现是在RedissonLock中的tryLockInnerAsync方法,源码如下:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

主要内容是在Lua脚本当中,从中可以看出Lua脚本的实现逻辑,逻辑流程图如下:

释放锁

具体实现是在RedissonLock中的unlockInnerAsync方法,源码如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}

从Lua脚本中可以看出以下几点逻辑:

  • 线程标识:线程是使用uuid和线程id结合起来做唯一标识的,其中uuid是Redisson对象(RedissonClient接口的实现)的属性,即在创建Redisson对象时就生成了。在同一个应用节点当中,线程标识的uuid是相同的,而线程id是不同的;在不同的应用节点当中,线程标识的uuid是不同的,而线程id是可能相同的。总之,在分布式环境当中,线程标识是不会重复的。
  • 释放锁的时候只会释放当前线程自己的锁。利用hashKey判断(hashKey是线程标识)
  • 一个线程如果多次获取锁,必须有相等次数的释放锁,否则锁不会释放。利用value的增长值做判断
  • 释放锁之后,会发布一个消息到指定的channel,channel名称格式为:redisson_lock__channel:{key}

读写锁 ReadWriteLock

读写锁是将资源的操作者分为两类:读者、写者。它允许多个读者同时获取到资源,但只允许一个写者对资源进行操作。

  • redisson中的读写锁顶层接口是:RReadWriteLock,继承了jdk中的读写锁接口:ReadWriteLock
  • 读锁的实现类是 RedissonReadLock,写锁的实现类是 RedissonWriteLock
  • 获取锁方法都是 tryLockInnerAsync,释放锁方法都是 unlockInnerAsync
  • 在Redis中都是利用HASH数据结构,锁的key、hashKey与可重入锁类似,释放锁时也与上面几点可重入锁的逻辑类似
  • 不同于可重入锁的是,读写锁增加了一个hashKey为mode的数据,值为read或write,用于标识锁的类型
  • 获取锁的方式
// 获取读锁
redissonClient.getReadWriteLock(key).readLock()

// 获取写锁
redissonClient.getReadWriteLock(key).writeLock()

公平锁 FairLock

公平锁和可重入锁一样继承了jdk中的 java.util.concurrent.locks.Lock 接口,在提供了自动过期解锁功能的同时,保证了当多个线程同时请求加锁时,优先分配给先发出请求的线程。公平锁的实现类是 RedissonFairLock。主要用到的Redis的两种数据结构及其作用

  • redisson_lock_queue:{key},List列表,用于存储线程列表,线程重复获取锁时会存在多个元素,保证获取锁的线程的顺序,实现优先分配给先发出请求的线程的功能。
  • redisson_lock_timeout:{key},SortedSet有序集合,用于存储线程获取锁时等待的超时时间,SortedSet中的score存储的是获取锁的等待超时时间,值越小说明越先请求获取锁,因此List中的线程顺序和SortedSet中的线程顺序是一致的(但并没有强行要求顺序必须一致)。线程重复获取锁时会重置score的值

获取锁

具体实现的方法是:tryLockInnerAsync,获取锁之前会清理等待超时的线程,Lua脚本如下:

// remove stale threads
while true do
local firstThreadId2 = redis.call("lindex", KEYS[2], 0);
if firstThreadId2 == false then
break;
end;
local timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2));
if timeout <= tonumber(ARGV[4]) then
redis.call("zrem", KEYS[3], firstThreadId2);
redis.call("lpop", KEYS[2]);
else
break;
end;
end;

处理逻辑:

加锁处理的Lua源码:

if (redis.call("exists", KEYS[1]) == 0) and ((redis.call("exists", KEYS[2]) == 0)
or (redis.call("lindex", KEYS[2], 0) == ARGV[2])) then
redis.call("lpop", KEYS[2]);
redis.call("zrem", KEYS[3], ARGV[2]);
redis.call("hset", KEYS[1], ARGV[2], 1);
redis.call("pexpire", KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then
redis.call("hincrby", KEYS[1], ARGV[2], 1);
redis.call("pexpire", KEYS[1], ARGV[1]);
return nil;
end;

local firstThreadId = redis.call("lindex", KEYS[2], 0);
local ttl;
if firstThreadId ~= false and firstThreadId ~= ARGV[2] then
ttl = tonumber(redis.call("zscore", KEYS[3], firstThreadId)) - tonumber(ARGV[4]);
else
ttl = redis.call("pttl", KEYS[1]);
end;

local timeout = ttl + tonumber(ARGV[3]);
if redis.call("zadd", KEYS[3], timeout, ARGV[2]) == 1 then
redis.call("rpush", KEYS[2], ARGV[2]);
end;
return ttl;

释放锁

公平锁的释放同样也遵循可重入锁介绍的几点逻辑,这里就不过多介绍,可以通过源码中的Lua脚本看到具体的实现。释放锁的时候也同样会先移除已等待超时的线程,处理逻辑与获取锁时一样。

以上就是redisson中可重入锁、读写锁、公平锁的一些实现方式。

spring boot starter的封装(基于spring AOP)

了解了以上几种锁的实现方式之后,我们可以结合spring AOP封装成spring boot starter,这样使用起来就会更加方便。此封装是在redisson-spring-boot-starter的基础之上进行的,请先阅读文章开头提到的 spring boot redisson starter的封装和使用

  • 首先,引入主要的依赖包,spring AOP支持和redisson-spring-boot-starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

<dependency>
<groupId>com.itopener</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
  • 既然是基于AOP的封装,需要定义一个方法级别的注解,注解的属性与redisson中分布式锁需要的参数保持一致,但额外增加一个锁类型的枚举,便于支持多种分布式锁源码如下:
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

import org.springframework.core.annotation.AliasFor;

/**
* @author fuwei.deng
* @date 2018年1月10日 上午10:47:50
* @version 1.0.0
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface LockAction {

/** 锁的资源,key。支持spring El表达式*/
@AliasFor("key")
String value() default "'default'";

@AliasFor("value")
String key() default "'default'";

/** 锁类型*/
LockType lockType() default LockType.REENTRANT_LOCK;

/** 获取锁等待时间,默认3秒*/
long waitTime() default 3000L;

/** 锁自动释放时间,默认30秒*/
long leaseTime() default 30000L;

/** 时间单位(获取锁等待时间和持锁时间都用此单位)*/
TimeUnit unit() default TimeUnit.MILLISECONDS;
}

关联的枚举定义如下:

/**
* @author fuwei.deng
* @date 2018年1月9日 上午11:25:59
* @version 1.0.0
*/
public enum LockType {

/** 可重入锁*/
REENTRANT_LOCK,

/** 公平锁*/
FAIR_LOCK,

/** 读锁*/
READ_LOCK,

/** 写锁*/
WRITE_LOCK;
}
  • 定义AOP配置和实现。对于加锁的资源,支持spring EL表达式,方便灵活的根据方法参数进行加锁
import java.lang.reflect.Method;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;

import com.itopener.redisson.spring.boot.autoconfigure.RedissonAutoConfiguration;

/**
* @author fuwei.deng
* @date 2017年6月14日 下午3:11:22
* @version 1.0.0
*/
@Aspect
@Configuration
@ConditionalOnBean(RedissonClient.class)
@AutoConfigureAfter(RedissonAutoConfiguration.class)
public class RedissonDistributedLockAspectConfiguration {

private final Logger logger = LoggerFactory.getLogger(RedissonDistributedLockAspectConfiguration.class);

@Autowired
private RedissonClient redissonClient;

private ExpressionParser parser = new SpelExpressionParser();

private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();

@Pointcut("@annotation(com.itopener.lock.redisson.spring.boot.autoconfigure.LockAction)")
private void lockPoint(){

}

@Around("lockPoint()")
public Object around(ProceedingJoinPoint pjp) throws Throwable{
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
LockAction lockAction = method.getAnnotation(LockAction.class);
String key = lockAction.value();
Object[] args = pjp.getArgs();
key = parse(key, method, args);

RLock lock = getLock(key, lockAction);
if(!lock.tryLock(lockAction.waitTime(), lockAction.leaseTime(), lockAction.unit())) {
logger.debug("get lock failed [{}]", key);
return null;
}

//得到锁,执行方法,释放锁
logger.debug("get lock success [{}]", key);
try {
return pjp.proceed();
} catch (Exception e) {
logger.error("execute locked method occured an exception", e);
} finally {
lock.unlock();
logger.debug("release lock [{}]", key);
}
return null;
}

/**
* @description 解析spring EL表达式
* @author fuwei.deng
* @date 2018年1月9日 上午10:41:01
* @version 1.0.0
* @param key 表达式
* @param method 方法
* @param args 方法参数
* @return
*/
private String parse(String key, Method method, Object[] args) {
String[] params = discoverer.getParameterNames(method);
EvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < params.length; i ++) {
context.setVariable(params[i], args[i]);
}
return parser.parseExpression(key).getValue(context, String.class);
}

private RLock getLock(String key, LockAction lockAction) {
switch (lockAction.lockType()) {
case REENTRANT_LOCK:
return redissonClient.getLock(key);

case FAIR_LOCK:
return redissonClient.getFairLock(key);

case READ_LOCK:
return redissonClient.getReadWriteLock(key).readLock();

case WRITE_LOCK:
return redissonClient.getReadWriteLock(key).writeLock();

default:
throw new RuntimeException("do not support lock type:" + lockAction.lockType().name());
}
}
}
  • 最后就在resources/META-INF/spring.factories中配置这个类,以便spring boot自动加载配置
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itopener.lock.redisson.spring.boot.autoconfigure.RedissonDistributedLockAspectConfiguration
  • 这样,就可以在方法上加LockAction注解来使用分布式锁了,示例如下:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.itopener.demo.lock.redisson.vo.UserVO;
import com.itopener.lock.redisson.spring.boot.autoconfigure.LockAction;
import com.itopener.lock.redisson.spring.boot.autoconfigure.LockType;

@Service
public class RedissonLockService {

private final Logger logger = LoggerFactory.getLogger(RedissonLockService.class);

@LockAction("'updateKey'")
public void update(String key){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
logger.error("exp", e);
}
}

@LockAction("#userVO.id")
public void spel(UserVO userVO){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
logger.error("exp", e);
}
}

@LockAction(value = "#userVO.id", lockType = LockType.WRITE_LOCK, waitTime = 30000)
public void update(UserVO userVO){
logger.info("write user : {}", userVO.getId());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("exp", e);
}
}

@LockAction(value = "#userVO.id", lockType = LockType.READ_LOCK, waitTime = 30000)
public UserVO read(UserVO userVO) {
logger.info("read user : {}", userVO.getId());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
logger.error("exp", e);
}
return userVO;
}
}

参考资料:

源码:

https://gitee.com/itopener/springboot

  • starter目录:itopener-parent / spring-boot-starters-parent / lock-redisson-spring-boot-starter-parent
  • demo目录:itopener-parent / demo-parent / demo-lock-redisson

转载于:https://my.oschina.net/dengfuwei/blog/1604975

  • 点赞
  • 收藏
  • 分享
  • 文章举报
chyk1414 发布了0 篇原创文章 · 获赞 0 · 访问量 1035 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: