您的位置:首页 > 数据库 > Redis

基于Redis setNX 实现分布式锁

2018-02-08 21:34 375 查看
单机环境如果对资源进行互斥访问可以通过语言原生的语法实现,如java中的sychronized关键字或者Lock锁,但是在分布式环境中,就需要有一个中心节点来保证资源的互斥访问。本文使用redis的setNX语句实现分布式锁。我们先简单实现一个分布式锁,如下: setNx(key,value); expire(key,timeout); //业务逻辑 delete(key);上面的方案有2个问题: 1.执行完setNx(key,value)之后,如果程序挂了,那锁永远不会释放,其余的线程永远不会获取到锁。 2.如果业务逻辑花费太长的时间,锁过期被其余线程获取,但是等业务逻辑之后,锁直接被删除,这样可能多个线程可 以获取到。针对第一个问题,可以把过期时间放到setNx的值中,根据过期时间判断锁是否过期,如果过期设置新的过期时间。针对第二个问题,在删除锁的时候,必须确保删除的是本线程加的锁。上面针对第一个问题的解决方案是,在判断是否过期,并设置新的过期时间是两部操作,在多线程环境下,可能存在多个线程同时获得锁的情况,可以使用redis原生的命令getset命令,解决此问题。此外还有一个问题,在集群环境下,可能存在服务器时间不一致的情况,可用从redis服务器获取时间,保证时间一致,但是会多请求一次redis。下面贴出代码实现:
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
* 分布式锁工具<br/>
* 说明:基于redis,使用setNx命令。使用value为时间戳的形式,保证不会一直持有锁
*
* Created by xxon 2018/2/7.
*/
@Component
public class DistributedLockUtil {

private static RedisTemplate<Object, Object> redisTemplate;

/**
* 加锁默认超时时间
*/
private static final long DEFAULT_TIMEOUT_SECOND = 5;

/**
* 加锁循环等待时间
*/
private static final long LOOP_WAIT_TIME_MILLISECOND = 30;

/**
* 加锁
* @param key
* @param timeoutSecond 如果为null,使用默认超时时间
* @return 加锁的值(超时时间)
*/
public static long lock(String key, Long timeoutSecond){

LogConstant.workorder.info("Thread:" + Thread.currentThread().getName() + " start lock");

//如果参数错误
if(timeoutSecond != null && timeoutSecond <= 0){
timeoutSecond = DEFAULT_TIMEOUT_SECOND;
}
timeoutSecond = timeoutSecond == null? DEFAULT_TIMEOUT_SECOND : timeoutSecond;

while (true){

//超时时间点
long timeoutTimeMilli = currentTimeMilliForRedis() + timeoutSecond * 1000;

//如果设置成功
if(redisTemplate.opsForValue().setIfAbsent(key, timeoutTimeMilli)){
LogConstant.workorder.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}

//如果已经超时
Long value = (Long)redisTemplate.opsForValue().get(key);
if(value != null && value.longValue() < currentTimeMilliForRedis()) {

//设置新的超时时间
Long oldValue = (Long) redisTemplate.opsForValue().getAndSet(key, timeoutTimeMilli);//旧的值

//多个线程同时getset,只有第一个才可以获取到锁
if (value.equals(oldValue)) {
LogConstant.workorder.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}
}

//延迟一定毫秒,防止请求太频繁
try {
Thread.sleep(LOOP_WAIT_TIME_MILLISECOND);
} catch (InterruptedException e) {
LogConstant.workorder.error("DistributedLockUtil lock sleep error", e);
}
}
}

/**
* 释放锁
* @param key
* @param lockValue
*/
public static void unLock(String key, long lockValue){

LogConstant.workorder.info("Thread:" + Thread.currentThread().getName() + " start unlock");

Long value = (Long)redisTemplate.opsForValue().get(key);
if(value != null && value.equals(lockValue)) {//如果是本线程加锁
redisTemplate.delete(key);
LogConstant.workorder.info("Thread:" + Thread.currentThread().getName() + " unlock success");
}
}

/**
* redis服务器时间
* @return
*/
private static long currentTimeMilliForRedis(){

return redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
return connection.time();
}
});
}

@PostConstruct
private static void init(){
redisTemplate = (RedisTemplate)SpringContextUtil.getBean("redisTemplate");
}
}

另外,在redis2.6之后,redis支持使用set命令直接设置过期时间并提供setNx相似的功能,由于命令是原子化的,可以减少编程的复杂度,预发如下:set key value [EX seconds] [PX milliseconds] [NX|XX]  。缺点:此种实现强依赖redis,如果redis挂了,分布式锁服务将不可用。另外现在redis基本都是主从结构,如果主节点挂了,切换到从节点,可能会存在多个线程同时获取到锁的情形。   针对上述缺点,有一种RedLock的方案,限于篇幅不展开细说。此外基于Zookeeper实现的分布式锁,也可以解决上述缺点。

PS:如果表述有不当之处,欢迎指正。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: