您的位置:首页 > 数据库 > Redis

Redis实现分布式环境下的分布式锁机制

2014-04-15 11:55 585 查看
Redis Redis命令参考

redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis 是一个高性能的key-value数据库。 redis的出现,很大程度补偿memcached这类key/value存储的不足,在部 分场合可以对关系数据库起到很好的补充作用。它提供了Python,Ruby,Erlang,PHP客户端,使用很方便.

Redis支持主从同步。数据可以从主服务器向任意数量的从服务器上同步,从服务器可以是关联其他从服务器的主服务器。这使得Redis可执行单层树复制。从盘可以有意无意的对数据进行写操作。由于完全实现了发布/订阅机制,使得从数据库在任何地方同步树时,可订阅一个频道并接收主服务器完整的消息发布记录。同步对读取操作的可扩展性和数据冗余很有帮助。



分布式系统

       分布式系统(distributed system)是建立在网络之上的软件系统。正是因为软件的特性,所以分布式系统具有高度的内聚性和透明性。因此,网络和分布式系统之间的区别更多的在于高层软件(特别是操作系统),而不是硬件。内聚性是指每一个数据库分布节点高度自治,有本地的数据库管理系统。透明性是指每一个数据库分布节点对用户的应用来说都是透明的,看不出是本地还是远程。在分布式数据库系统中,用户感觉不到数据是分布的,即用户不须知道关系是否分割、有无副本、数据存于哪个站点以及事务在哪个站点上执行等。

Question

       由于分布式系统的特点,当我们的一个web项目跑在一个分布式的环境中。会实现类似于,当一个客户端用户向服务器发送一个请求时会会随即的分配到一台服务器上进行执行并返回结果。像这类类似于查询操作等有客户端发送请求在分布式环境下可以得到很快的响应对于整个系统也没有影响。但是有些场景下我们需要执行一些类似与定时任务之类的操作。由于整个系统的服务可能在多个服务器上都有配置所以可能会导致多个服务器一起跑一个定时任务,这种场景是不被允许的。所以需要设置一个锁来管理。

解决方法

        使用redis分布式锁来管理-》情景当操作一个定时任务时只允许一个操作。可以在操作上加上锁控制(在这个操作中,我尝试了使用了Spring的AOP来封装处理一个,实现在程序中加分布式锁只需要通过注解的形式处理)

Redis的XML配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd ">
<!-- 实现导入properties配置文件 -->
<util:properties id="app" location="classpath:app.properties"/>

<!-- jedisPoolConfig连接线程配置 -->
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="5"/>
<property name="maxIdle" value="2"/>
<property name="maxWaitMillis" value="10000"/>
<property name="testOnBorrow" value="true"/>
</bean>

<!-- jedisPool连接处理类 -->
<bean id="jedisPool" class="redis.clients.jedis.JedisPool">
<constructor-arg index="0" ref="jedisPoolConfig"/>
<constructor-arg index="1" value="127.0.0.1"/>
<constructor-arg index="2" value="6379" type="int"/>
</bean>

<!-- jedisTemplate是jedis的实现类 -->
<bean id="jedisTemplate" class="com.rabbit.util.RedisTemplate">
<constructor-arg index="0" ref="jedisPool"/>
</bean>
</beans>

封装Jedis的实现类

package com.rabbit.util;

import com.rabbit.ActionInterface.RedisAction;
import com.rabbit.ActionInterface.RedisNoResultAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.Pool;

/**
* 实现Jedis对象的封装,实现操作
*/
public class RedisTemplate {
private static Logger logger = LoggerFactory.getLogger(RedisTemplate.class);

//封装一个pool池用于jedis对象的管理
private Pool<Jedis> jedisPool;

public RedisTemplate(Pool<Jedis> jedisPool) {
this.jedisPool = jedisPool;
}

/**
* 通过观察者模式实现事件的回调处理(使用范型)
*
* @param jedisAction 需要执行的方法接口
* @param <T> 执行发放的返回类型
* @return T
* @throws JedisException
* 返回执行好的对象信息
*/
public <T> T execute(RedisAction<T> jedisAction){
T result = null;
Jedis jedis = null;
boolean broken = false;
try {
jedis = jedisPool.getResource();
result = jedisAction.action(jedis);
} catch (JedisConnectionException e) {
logger.error("Redis connection lost.", e);
broken = true;
} finally {
//释放jedis资源
closeResource(jedis, broken);
return result;
}
}

/**
* 通过观察者模式实现事件的回调处理
*/
public void execute(RedisNoResultAction jedisAction){
Jedis jedis = null;
boolean broken = false;
try {
jedis = jedisPool.getResource();
jedisAction.actionNoResult(jedis);
} catch (JedisConnectionException e) {
logger.error("Redis connection lost.", e);
broken = true;
} finally {
closeResource(jedis, broken);
}
}

/**
* 将jedis资源恢复到pool中
*
* @param jedis jedis资源
* @param connectionBroken 连接状态(根据不同的状态,使用不同的资源释放方式)
*/
protected void closeResource(Jedis jedis, boolean connectionBroken) {
if (jedis != null) {
if (connectionBroken) {
//当失败的连接的资源的方式
jedisPool.returnBrokenResource(jedis);
} else {
//当成功的连接的资源的方式
jedisPool.returnResource(jedis);
}
}
}

/**
* 获取jedisPool对象.
*/
public Pool<Jedis> getJedisPool() {
return jedisPool;
}
}


封装redis分布式锁对象
package com.rabbit.util;

import lombok.Getter;
import lombok.Setter;

/**
* Created by zero on 14-4-3.
*/

public class RedisLockModel{
//锁id
@Setter
@Getter
private String lockId;

//锁name
@Getter
private String lockName;

public RedisLockModel(String lockName){
this.lockName = lockName;
}
}


RedisLock注解(通过Spring AOP实现的)

package com.rabbit.annotation;

import com.rabbit.util.RedisKeys;

import java.lang.annotation.*;

/**
* 使用注解方式实现redis分布式锁操作的设计
* Created by zero on 14-4-8.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLock {
//redis的key对象
RedisKeys redisKeys();

//最长等待时间
int maxWait();

//键的过期时间
int expiredTime();
}


RedisLock的AOP的实现类
package com.rabbit.annotation;

import com.rabbit.util.RedisLockExecute;
import com.rabbit.util.RedisLockModel;
import com.rabbit.util.RedisTemplate;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 通过redis实现对于分布式的环境下的并发管理(通过redis锁实现并发处理)
* 通过注解实现
*/
@Aspect
@Component
public class RedisLockOperation {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockOperation.class);

@Autowired
private RedisTemplate redisTemplate;

/**
* 创建一个redis锁
* @param proceed 实际处理对象
* @param redisLock RedisLock注解对象
*/
@Around("execution (* *(..)) && @annotation(redisLock)")
public void acquireLock(final ProceedingJoinPoint proceed, final RedisLock redisLock) {
RedisLockModel redisLockModel = RedisLockExecute.acquireLock(redisTemplate , redisLock.redisKeys(), redisLock.maxWait(), redisLock.expiredTime());

try{
if(RedisLockExecute.ACQUIRE_RESULT(redisLockModel)){
proceed.proceed();
}else{
LOGGER.debug("acquire lock is failed!");
}
} catch (Throwable throwable) {
LOGGER.error("proceed={} is failed!", proceed.toString());
throwable.printStackTrace();
}finally {
RedisLockExecute.releaseLock(redisTemplate , redisLockModel);
}
}
}

需要注意的是需要在Spring配置文件中加入<!-- 提供Spring Aop的支持 -->

 <aop:aspectj-autoproxy/>

RabbitLock处理类

package com.rabbit.util;

import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.rabbit.ActionInterface.RedisAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* 通过redis实现对于分布式的环境下的并发管理(通过redis锁实现并发处理)
*/
public class RedisLockExecute {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockExecute.class);

/**
* 创建一个redis锁
* @param redisTemplate redis处理对象
* @param redisKeys 需要添加的锁对象
* @param maxWait 最长的添加锁时的等待时间(单位:ms)
* @param expiredTime 设置锁的过期时间(单位:s)
* @return RedisLockModel
* 返回一个封装好后的一个锁对象
*/
public static RedisLockModel acquireLock(final RedisTemplate redisTemplate, final RedisKeys redisKeys, final Integer maxWait, final Integer expiredTime) {
//随即获取一个uuid,作为一个key的标识区别与,同名lock
final String uuid = UUID.randomUUID().toString();

Boolean result = redisTemplate.execute(new RedisAction<Boolean>(){
@Override
public Boolean action(Jedis jedis) {
//获取等待时间的最后时间段
long expireAt = System.currentTimeMillis()+maxWait;
//循环判断锁是否一直存在
while (System.currentTimeMillis() < expireAt){
LOGGER.debug("try lock key={} ", redisKeys.toString());
if (jedis.setnx(redisKeys.toString(), uuid) == 1) {
jedis.expire(redisKeys.toString(), expiredTime);
LOGGER.debug("get lock, key={}, expire in seconds={}", redisKeys.toString(), expiredTime);
return true;
} else {
// 存在锁(会一直执行(在最大等待时间范围内)等待该锁释放)
String desc = jedis.get(redisKeys.toString());
LOGGER.debug("key={} locked by another business={}", redisKeys.toString(), desc);
}

try{
TimeUnit.MILLISECONDS.sleep(1);
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

//在最大等待时间范围内未获取到锁
return false;
}
});

RedisLockModel redisLockModel = new RedisLockModel(redisKeys.toString());
redisLockModel.setLockId(result ? uuid : null);
return redisLockModel;
}

/**
* 通过获得的锁编号判断锁添加是否成功
* @param redisLockModel redis锁对象
* @return Boolean
* 返回是否添加成功
*/
public static Boolean ACQUIRE_RESULT(RedisLockModel redisLockModel){
return !Strings.isNullOrEmpty(redisLockModel.getLockId());
}

//释放锁
public static Boolean releaseLock(final RedisTemplate redisTemplate , final RedisLockModel redisLockModel) {
return redisTemplate.execute(new RedisAction<Boolean>() {
@Override
public Boolean action(Jedis jedis) {
//防止多个服务器处理同一个key需要watch操作(相当于是禁止了其他client处理这个key)
jedis.watch(redisLockModel.getLockName());

//如果锁没有过期,则锁的值必然没有改变
if (Objects.equal(redisLockModel.getLockId(), jedis.get(redisLockModel.getLockName()))) {
//删除锁
jedis.del(redisLockModel.getLockName());
return true;
}

//锁已经过期了,释放watch操作
jedis.unwatch();
return false;
}
});
}

//批量释放锁
public static Integer releaseBatchLock(final RedisTemplate redisTemplate , final List<RedisLockModel> redisLockModels){
return redisTemplate.execute(new RedisAction<Integer>() {
@Override
public Integer action(Jedis jedis) {
Transaction transaction = jedis.multi();

LOGGER.debug("release batch redis lock start");
for(RedisLockModel redisLockModel : redisLockModels){
if(Objects.equal(redisLockModel.getLockId() , jedis.get(redisLockModel.getLockName()))){
//删除未过期的锁
transaction.del(redisLockModel.getLockName());
}
}

//事务提交操作(返回执行了多少条事务)
Integer length = transaction.exec().size();
LOGGER.debug("release batch redis lock end, release num={}", length);

return length;
}
});
}

/**
* 销毁全部所有创建的lock
*
* @param redisTemplate redis操作对象
* @return Integer
* 返回总共销毁的创建条数
*/
public static Integer destructionLocks(final RedisTemplate redisTemplate){
return redisTemplate.execute(new RedisAction<Integer>() {
@Override
public Integer action(Jedis jedis) {
Transaction transaction = jedis.multi();

LOGGER.debug("destruction redis lock start");
for(RedisKeys redisKeys : RedisKeys.values()){
transaction.del(redisKeys.toString());
}

//事务提交操作(返回执行了多少条事务)
Integer length = transaction.exec().size();
LOGGER.debug("destruction redis lock end, release num={}", length);

return length;
}
});
}
}

这里就不再写测试类了,这个是我上一个博客关于Rabbit处理中的一部分代码。有兴趣的同学可以去看看
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  redis 并发 aop