您的位置:首页 > 数据库 > Redis

使用redis实现分布式锁(在集群环境下让任务排队执行)

2017-11-19 20:40 1041 查看
需求:在集群环境下,读写同一个数据库表,我们为了保证数据的最终一致性,需要让任务排队执行。分布式锁的实现方式,网上有很多种方式。

1.使用数据库表实现;

2.使用zookeeper实现;

3.使用redis实现;

这里讲用redis实现的方法,其他两种实现方式,读者可以自行百度。

redis是个很好的NoSQL数据库,多用于缓存数据的场景,但同时也可以用来制作一个分布式事务锁,其实现的原理基于几个命令:

SETNX key val;

解释:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。

expire key timeout;

为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。

delete key;

删除key。

那么如何实现这个分布式事务锁呢?

1.添加项目中的redis的jar包依赖;

<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>

<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>${spring-data-redis.version}</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${redis.version}</version>
</dependency>


2.在项目的Spring配置文件中配置redis

<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<!-- 最大空闲数 -->
<property name="maxIdle" value="300" />
<!-- 最大连接数 -->
<property name="maxTotal" value="600" />
<!-- 最大建立连接等待时间,单位毫秒-->
<property name="maxWaitMillis" value="200000" />
<!-- 指明是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个 -->
<property name="testOnBorrow" value="true" />

</bean>

<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="119.23.134.93" />
<property name="port" value="6379"/>
<property name="password" value="coeexp123456"/>
<property name="poolConfig" ref="poolConfig" />
<property name="usePool" value="true"/>
</bean>

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory"   ref="jedisConnectionFactory" />
</bean>


3.在实际开发中都是使用Spring的注解,面向切面编程,

这里需要自定义两个注解:

注解1:用于方法上
/**
* 类说明
* @author  ll
* @version 创建时间:2017年10月27日上午10:10:47
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
//@Component
public @interface P4jSyn {
/**
* 锁的key<br/>
* 如果想增加坑的个数添加非固定锁,可以在参数上添加@P4jSynKey注解,但是本参数是必写选项<br/>
* redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey<br/>
*
*/
String synKey();

/**
* 持锁时间,超时时间,持锁超过此时间自动丢弃锁<br/>
* 单位毫秒,默认20秒<br/>
* 如果为0表示永远不释放锁,在设置为0的情况下toWait为true是没有意义的<br/>
* 但是没有比较强的业务要求下,不建议设置为0
*/
long keepMills() default 20 * 1000;

/**
* 当获取锁失败,是继续等待还是放弃<br/>
* 默认为继续等待
*/
boolean toWait() default true;

/**
* 没有获取到锁的情况下且toWait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间<br/>
* 默认为10毫秒
*
* @return
*/
long sleepMills() default 10;

/**
* 锁获取超时时间:<br/>
* 没有获取到锁的情况下且toWait()为true继续等待,最大等待时间,如果超时抛出
* {@link java.util.concurrent.TimeoutException.TimeoutException}
* ,可捕获此异常做相应业务处理;<br/>
* 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去;
*
* @return
*/
long maxSleepMills() default 60 * 1000;
}


注解2:用于参数上

/**
* 类说明
* @author  ll
* @version 创建时间:2017年10月27日上午10:12:25
*/
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface P4jSynKey {
/**
* key的拼接顺序
*
* @return
*/
int index() default 0;
}


4.分布式锁的切面类

/**
* 类说明
* @author   ll
* @version 创建时间:2017年10月27日上午10:14:06
*/
@Order(1)
@Aspect
@Component("redisLockAspect")
public class RedisLockAspect {
private static final Logger logger = LoggerFactory.getLogger(RedisLockAspect.class);
@Autowired
@Qualifier("redisTemplate")
private RedisTemplate<String, Long> redisTemplate;

@Around("execution(* com.coe..*SycLock(..))")
public Object lock(ProceedingJoinPoint  pjp) throws Throwable {
//获取P4jSyn注解
P4jSyn lockInfo = getLockInfo(pjp);
if (lockInfo == null) {
throw new IllegalArgumentException("配置参数错误");
}
String synKey = getSynKey(pjp, lockInfo.synKey());
if (synKey == null || "".equals(synKey)) {
throw new IllegalArgumentException("配置参数synKey错误");
}
boolean lock = false;  //标志物,true表示获取了到了该锁
Object obj = null;
try {
//超时时间 (60秒),系统当前时间再往后加60秒
long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills();
while (!lock) {
//持锁时间,系统当前时间再往后加20秒
long keepMills = System.currentTimeMillis() + lockInfo.keepMills();
//为key“synKey”设置值keepMills,如果设置成功,则返回true
lock = setIfAbsent(synKey, keepMills);
//lock为true表示得到了锁,没有人加过相同的锁
if(lock){
//如果获得了该锁,则调用目标方法,执行业务逻辑任务
obj = pjp.proceed();
}
// 锁设置了没有超时时间
/**如果没有通过setIfAbsent拿到数据,然后判断是否对锁设置了超时机制
,没有设置则判断是否需要继续等待*/
else if(lockInfo.keepMills() <= 0){
// 继续等待获取锁
if (lockInfo.toWait()) {
// 如果超过最大等待时间抛出异常
if(lockInfo.maxSleepMills()>0&&System.currentTimeMillis()> maxSleepMills){
throw new TimeoutException("获取锁资源等待超时");
}
//只要当前时间没有大于超时时间,则继续等待10毫秒,以便继续尝试去获取锁
TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
}else{
//如果注解上的“toWait()”为false,表示如果当前没有获取到锁,则放弃获取该锁,
//即放弃执行此任务
break;
}
}
// 已过期,并且getAndSet后旧的时间戳依然是过期的,可以认为获取到了锁
/**
* 1.如果当前线程2进入的时候,
* 系统时间已经大于了上个任务的持锁时间(由于上次任务大导致其执行时间过长),
* 则表示需要强制让上个任务释放锁,让本任务获得锁,以执行本次任务;
* 2.如果线程1释放了锁,刚好线程2过了 if(lock){ //to do something}的判断,
* 而进入了此处判断,需要对线程2任务加锁,保证事务不冲突
*/
else if(System.currentTimeMillis()>getLock(synKey)&&(System.currentTimeMillis()> getSet(synKey, keepMills))) {
lock = true;             //lock一定要设置成true,不然释放不了锁
obj = pjp.proceed();
}
// 没有得到任何锁
else {
// 继续等待获取锁
if (lockInfo.toWait()) {
// 如果超过最大等待时间抛出异常
if (lockInfo.maxSleepMills()>0&&System.currentTimeMillis() maxSleepMills) {
throw new TimeoutException("获取锁资源等待超时");
}
//只要当前时间没有大于超时时间,则继续等待10毫秒,以便继续尝试去获取锁
TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
}else {
// 放弃等待,放弃获取锁(放弃本任务的执行)
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
// 如果获取到了锁,释放锁
if (lock) {
releaseLock(synKey);
}
}
return obj;
}

/**
* 获取包括方法参数上的key<br/>
* redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey
*
*/
private String getSynKey(JoinPoint pjp, String synKey) {
try {
synKey = "RedisSyn+" + synKey;  //指定synKey的值固定为RedisSyn+synKey
Object[] args = pjp.getArgs();  //获取切点上的所有参数
if (args != null && args.length > 0) {
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations();
SortedMap<Integer, String> keys = new TreeMap<Integer, String>();
for (int ix = 0; ix < paramAnnotationArrays.length; ix++) {
P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]);
if (p4jSynKey != null) {
Object arg = args[ix];
if (arg != null) {
keys.put(p4jSynKey.index(), arg.toString());
}
}
}
if (keys != null && keys.size() > 0) {
for (String key : keys.values()) {
synKey = synKey + key;
}
}
}
return synKey;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

@SuppressWarnings("unchecked")
private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) {
if (annotations != null && annotations.length > 0) {
for (final Annotation annotation : annotations) {
if (annotationClass.equals(annotation.annotationType())) {
return (T) annotation;
}
}
}
return null;
}

/**
* 获取RedisLock注解信息
*/
private P4jSyn getLockInfo(ProceedingJoinPoint  pjp) {
try {
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Method method = methodSignature.getMethod();
P4jSyn lockInfo = method.getAnnotation(P4jSyn.class);
return lockInfo;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

public BoundValueOperations<String, Long> getOperations(String key) {
return redisTemplate.boundValueOps(key);
}

/**
* Set {@code value} for {@code key}, only if {@code key} does not exist.
* <p>
* See http://redis.io/commands/setnx *
* @param key
*            must not be {@literal null}.
* @param value
*            must not be {@literal null}.
* @return
*/
/**
* 如果key不存在,则为key设置值value,并且返回true,否则返回false
* @param key
* @param value
* @return
*/
public boolean setIfAbsent(String key, Long value) {
return getOperations(key).setIfAbsent(value);
}

/**
* 获取key上的值
* @param key
* @return
*/
public long getLock(String key) {
Long time = getOperations(key).get();
if (time == null) {
return 0;
}
return time;
}

/**
* 获取key上的旧值,并且为该key设置新值value,如果旧值不存在则返回0
* @param key
* @param value
* @return
*/
public long getSet(String key, Long value) {
Long time = getOperations(key).getAndSet(value);
if (time == null) {
return 0;
}
return time;
}

/**
* 删除key
* @param key
*/
public void releaseLock(String key) {
redisTemplate.delete(key);
}
}


锁写好之后,编写测试代码:

定义一个成员变量i,启动100个线程同时访问这个方法,让i++;

public class LockInfo {
private int i = 0;

@P4jSyn(synKey="getTrackno")
public void addSycLock(@P4jSynKey(index=1)String flag,@P4jSynKey(index=2) String channelCode){
i++;
System.out.println("i =====================" + i);
}
}


开启100个线程同时执行这段代码

@Component("lockTest")
public class LockTest {
@Autowired
private LockInfo lockInfo ;

@Scheduled(fixedRate=3600000,initialDelay = 10000)
public void run(){
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
public void run() {
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lockInfo .addSycLock("11111", "222222");
}
}).start();
}
}
}


测试结果:虽然同时开启100个线程来调用这个方法,但是i的值始终是依次递增,大家可以试试,去掉注解之后再同时开启100个线程来调用这个方法,看是不是得到不同的结果

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐