分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)...
Redis与spring的整合
相关依赖jar包
spring把专门的数据操作独立封装在spring-data系列中,spring-data-redis是对Redis的封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
<dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version> 1.4 . 2 .RELEASE</version> </dependency>
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version> 2.6 . 2 </version> </dependency>
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version> 2.4 . 2 </version> </dependency> |
Spring 配置文件applicationContext.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
<!--命令空间中加入下面这行--> xmlns:p= "http://www.springframework.org/schema/p"
<!-- redis连接池配置文件 --> <context:property-placeholder location= "classpath:redis.properties" />
<bean id= "poolConfig" class = "redis.clients.jedis.JedisPoolConfig" > <property name= "maxIdle" value= "${redis.maxIdle}" /> <property name= "maxTotal" value= "${redis.maxTotal}" /> <property name= "MaxWaitMillis" value= "${redis.MaxWaitMillis}" /> <property name= "testOnBorrow" value= "${redis.testOnBorrow}" /> </bean> <bean id= "connectionFactory" class = "org.springframework.data. redis.connection.jedis.JedisConnectionFactory" p:host-name= "${redis.host}" p:port= "${redis.port}" p:password= "${redis.pass}" p:pool-config-ref= "poolConfig" /> <bean id= "redisTemplate" class = "org.springframework.data. redis.core.RedisTemplate" > <property name= "connectionFactory" ref= "connectionFactory" /> </bean> |
注意新版的maxTotal,MaxWaitMillis这两个字段与旧版的不同。
redis连接池配置文件redis.properties
1 2 3 4 5 6 7 8 |
redis.host= 192.168 . 2.129 redis.port= 6379 redis.pass=redis129
redis.maxIdle= 300 redis.maxTotal= 600 redis.MaxWaitMillis= 1000 redis.testOnBorrow= true |
好了,配置完成,下面写上代码
测试代码
User
1 2 3 4 5 6 7 8 9 |
@Entity @Table (name = "t_user" ) public class User { //主键 private String id; //用户名 private String userName; //...省略get,set... } |
BaseRedisDao
1 2 3 4 5 6 7 |
@Repository public abstract class BaseRedisDao<K,V> { @Autowired (required= true ) protected RedisTemplate<K, V> redisTemplate;
} |
IUserDao
1 2 3 4 5 6 7 8 9 10 11 |
public interface IUserDao { public boolean save(User user); public boolean update(User user);
public boolean delete(String userIds); public User find(String userId); } |
UserDao
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
@Repository public class UserDao extends BaseRedisDao<String, User> implements IUserDao { @Override public boolean save( final User user) { boolean res = redisTemplate.execute( new RedisCallback<Boolean>() { public Boolean doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); byte [] key = serializer.serialize(user.getId()); byte [] value = serializer.serialize(user.getUserName()); //set not exits return connection.setNX(key, value); } }); return res; }
@Override public boolean update( final User user) { boolean result = redisTemplate.execute( new RedisCallback<Boolean>() { public Boolean doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); byte [] key = serializer.serialize(user.getId()); byte [] name = serializer.serialize(user.getUserName()); //set connection.set(key, name); return true ; } }); return result; }
@Override public User find( final String userId) { User result = redisTemplate.execute( new RedisCallback<User>() { public User doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); byte [] key = serializer.serialize(userId); //get byte [] value = connection.get(key); if (value == null ) { return null ; } String name = serializer.deserialize(value); User resUser = new User(); resUser.setId(userId); resUser.setUserName(name); return resUser; } }); return result; }
@Override public boolean delete( final String userId) { boolean result = redisTemplate.execute( new RedisCallback<Boolean>() { public Boolean doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); byte [] key = serializer.serialize(userId); //delete connection.del(key); return true ; } }); return result; }
} |
Test
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
@RunWith (SpringJUnit4ClassRunner. class ) @ContextConfiguration (locations = { "classpath*:applicationContext.xml" }) public class RedisTest extends AbstractJUnit4SpringContextTests { @Autowired private IUserDao userDao; @Test public void testSaveUser() { User user = new User(); user.setId( "402891815170e8de015170f6520b0000" ); user.setUserName( "zhangsan" ); boolean res = userDao.save(user); Assert.assertTrue(res); } @Test public void testGetUser() { User user = new User(); user = userDao.find( "402891815170e8de015170f6520b0000" ); System.out.println(user.getId() + "-" + user.getUserName() ); } @Test public void testUpdateUser() { User user = new User(); user.setId( "402891815170e8de015170f6520b0000" ); user.setUserName( "lisi" ); boolean res = userDao.update(user); Assert.assertTrue(res); } @Test public void testDeleteUser() { boolean res = userDao.delete( "402891815170e8de015170f6520b0000" ); Assert.assertTrue(res); } } |
String类型的增删该查已完成,Hash,List,Set数据类型的操作就不举例了,和使用命令的方式差不多。如下
1 2 3 4 5 6 7 8 9 10 11 12 13 |
connection.hSetNX(key, field, value); connection.hDel(key, fields); connection.hGet(key, field);
connection.lPop(key); connection.lPush(key, value); connection.rPop(key); connection.rPush(key, values);
connection.sAdd(key, values); connection.sMembers(key); connection.sDiff(keys); connection.sPop(key); |
整合可能遇到的问题
1.NoSuchMethodError
1 2 3 |
java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V
Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V |
类似找不到类,找不到方法的问题,当确定依赖的jar已经引入之后,此类问题多事spring-data-redis以及jedis版本问题,多换个版本试试,本文上面提到的版本可以使用。
1.No qualifying bean
1 |
No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found for dependency |
找不到bean,考虑applicationContext.xml中配置redisTemplate bean时实现类是否写错。例如,BaseRedisDao注入的是RedisTemplate类型的对象,applicationContext.xml中配置的实现类却是RedisTemplate的子类StringRedisTemplate,那肯定报错。整合好后,下面我们着重学习基于redis的分布式锁的实现。
基于redis实现的分布式锁
我们知道,在多线程环境中,锁是实现共享资源互斥访问的重要机制,以保证任何时刻只有一个线程在访问共享资源。锁的基本原理是:用一个状态值表示锁,对锁的占用和释放通过状态值来标识,因此基于redis实现的分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁,DEL相当于释放锁,当然,在下面的具体实现中会更复杂些。之所以称为分布式锁,是因为客户端可以在redis集群环境中向集群中任一个可用Master节点请求上锁(即SETNX命令存储key到redis缓存中是随机的)。
现在相信你已经对在基于redis实现的分布式锁的基本概念有了解,需要注意的是,这个和前面文章提到的使用WATCH 命令对key值进行锁操作没有直接的关系。java中synchronized和Lock对象都能对共享资源进行加锁,下面我们将学习用java实现的redis分布式锁。
java中的锁技术
在分析java实现的redis分布式锁之前,我们先来回顾下java中的锁技术,为了直观的展示,我们采用“多个线程共享输出设备”来举例。
不加锁共享输出设备
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
public class LockTest { //不加锁 static class Outputer { public void output(String name) { for ( int i= 0 ; i<name.length(); i++) { System.out.print(name.charAt(i)); } System.out.println(); } } public static void main(String[] args) { final Outputer output = new Outputer(); //线程1打印zhangsan new Thread( new Runnable(){ @Override public void run() { while ( true ) { try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } output.output( "zhangsan" ); } } }).start(); //线程2打印lingsi new Thread( new Runnable(){ @Override public void run() { while ( true ) { try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } output.output( "lingsi" ); } } }).start(); //线程3打印wangwu new Thread( new Runnable(){ @Override public void run() { while ( true ) { try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } output.output( "huangwu" ); } } }).start(); } } |
上面例子中,三个线程同时共享输出设备output,线程1需要打印zhangsan,线程2需要打印lingsi,线程3需要打印wangwu。在不加锁的情况,这三个线程会不会因为得不到输出设备output打架呢,我们来看看运行结果:
1 2 3 4 5 6 7 8 9 10 11 |
huangwu zhangslingsi an huangwu zlingsi hangsan huangwu lzhangsan ingsi huangwu lingsi |
从运行结果可以看出,三个线程打架了,线程1没打印完zhangsan,线程2就来抢输出设备......可见,这不是我们想要的,我们想要的是线程之间能有序的工作,各个线程之间互斥的使用输出设备output。
使用java5中的Lock对输出设备加锁
现在我们对Outputer进行改进,给它加上锁,加锁之后每次只有一个线程能访问它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
//使用java5中的锁 static class Outputer{ Lock lock = new ReentrantLock(); public void output(String name) { //传统java加锁 //synchronized (Outputer.class){ lock.lock(); try { for ( int i= 0 ; i<name.length(); i++) { System.out.print(name.charAt(i)); } System.out.println(); } finally { //任何情况下都有释放锁 lock.unlock(); } //} } } |
看看加锁后的输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
zhangsan lingsi huangwu zhangsan lingsi huangwu zhangsan lingsi huangwu zhangsan lingsi huangwu zhangsan lingsi huangwu ...... |
从运行结果中可以看出,三个线程之间不打架了,线程之间的打印变得有序。有个这个基础,下面我们来学习基于Redis实现的分布式锁就更容易了。
Redis分布式锁
实现分析
从上面java锁的使用中可以看出,锁对象主要有lock与unlock方法,在lock与unlock方法之间的代码(临界区)能保证线程互斥访问。基于redis实现的Java分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁(lock),DEL相当于释放锁(unlock)。我们只要实现Lock接口重写lock()和unlock()即可。但是这还不够,安全可靠的分布式锁应该满足满足下面三个条件:
l 互斥,不管任何时候,只有一个客户端能持有同一个锁。
l 不会死锁,最终一定会得到锁,即使持有锁的客户端对应的master节点宕掉。
l 容错,只要大多数Redis节点正常工作,客户端应该都能获取和释放锁。
那么什么情况下回不满足上面三个条件呢。多个线程(客户端)同时竞争锁可能会导致多个客户端同时拥有锁。比如,
(1)线程1在master节点拿到了锁(存入key)
(2)master节点在把线程1创建的key写入slave之前宕机了,此时集群中的节点已经没有锁(key)了,包括master节点的slaver节点
(3)slaver节点升级为master节点
(4)线程2向新的master节点发起锁(存入key)请求,很明显,能请求成功。
可见,线程1和线程2同时获得了锁。如果在更高并发的情况,可能会有更多线程(客户端)获取锁,这种情况就会导致上文所说的线程“打架”问题,线程之间的执行杂乱无章。
那什么情况下又会发生死锁的情况呢。如果拥有锁的线程(客户端)长时间的执行或者因为某种原因造成阻塞,就会导致锁无法释放(unlock没有调用),其它线程就不能获取锁而而产生无限期死锁的情况。其它线程在执行lock失败后即使粗暴的执行unlock删除key之后也不能正常释放锁,因为锁就只能由获得锁的线程释放,锁不能正常释放其它线程仍然获取不到锁。解决死锁的最好方式是设置锁的有效时间(redis的expire命令),不管是什么原因导致的死锁,有效时间过后,锁将会被自动释放。
为了保障容错功能,即只要有Redis节点正常工作,客户端应该都能获取和释放锁,我们必须用相同的key不断循环向Master节点请求锁,当请求时间超过设定的超时时间则放弃请求锁,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,应该尽快尝试下一个master节点。释放锁比较简单,因为只需要在所有节点都释放锁就行,不管之前有没有在该节点获取锁成功。
Redlock算法
根据上面的分析,官方提出了一种用Redis实现分布式锁的算法,这个算法称为RedLock。RedLock算法的主要流程如下:
RedLock算法主要流程
Java实现
结合上面的流程图,加上下面的代码解释,相信你一定能理解redis分布式锁的实现原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
public class RedisLock implements Lock{
protected StringRedisTemplate redisStringTemplate;
// 存储到redis中的锁标志 private static final String LOCKED = "LOCKED" ;
// 请求锁的超时时间(ms) private static final long TIME_OUT = 30000 ;
// 锁的有效时间(s) public static final int EXPIRE = 60 ;
// 锁标志对应的key; private String key;
// state flag private volatile boolean isLocked = false ;
public RedisLock(String key) { this .key = key; @SuppressWarnings ( "resource" ) ApplicationContext ctx = new ClassPathXmlApplicationContext( "classpath*:applicationContext.xml" ); redisStringTemplate = (StringRedisTemplate)ctx.getBean( "redisStringTemplate" ); }
@Override public void lock() { //系统当前时间,毫秒 long nowTime = System.nanoTime(); //请求锁超时时间,毫秒 long timeout = TIME_OUT* 1000000 ; final Random r = new Random(); try { //不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁 //这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间 //如果一个master节点不可用了,应该尽快尝试下一个master节点 while ((System.nanoTime() - nowTime) < timeout) { //将锁作为key存储到redis缓存中,存储成功则获得锁 if (redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(), LOCKED.getBytes())) { //设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间 //可以防止因异常情况无法释放锁而造成死锁情况的发生 redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS); isLocked = true ; //上锁成功结束请求 break ; } //获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现 //睡眠3毫秒后继续请求锁 Thread.sleep( 3 , r.nextInt( 500 )); } } catch (Exception e) { e.printStackTrace(); } }
@Override public void unlock() { //释放锁 //不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作 if (isLocked) { redisStringTemplate.delete(key); } }
@Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub }
@Override public boolean tryLock() { // TODO Auto-generated method stub return false ; }
@Override public boolean tryLock( long time, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false ; }
@Override public Condition newCondition() { // TODO Auto-generated method stub return null ; } } |
好了,RedisLock已经实现,我们对Outputer使用RedisLock进行修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
/使用RedisLock static class Outputer { //创建一个名为redisLock的RedisLock类型的锁 RedisLock redisLock = new RedisLock( "redisLock" ); public void output(String name) { //上锁 redisLock.lock(); try { for ( int i= 0 ; i<name.length(); i++) { System.out.print(name.charAt(i)); } System.out.println(); } finally { //任何情况下都要释放锁 redisLock.unlock(); } } } |
看看使用RedisLock加锁后的的运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
lingsi zhangsan huangwu lingsi zhangsan huangwu lingsi zhangsan huangwu lingsi zhangsan huangwu lingsi zhangsan huangwu ...... |
可见,使用RedisLock加锁后线程之间不再“打架”,三个线程互斥的访问output。
问题
现在我无法论证RedLock算法在分布式、高并发环境下的可靠性,但从本例三个线程的运行结果看,RedLock算法确实保证了三个线程互斥的访问output(redis.maxIdle=300 redis.maxTotal=600,运行到Timeout waiting for idle object都没有出现线程“打架”的问题)。我认为RedLock算法仍有些问题没说清楚,比如,如何防止宕机时多个线程同时获得锁;RedLock算法在释放锁的处理上,不管线程是否获取锁成功,只要上了锁,就会到每个master节点上释放锁,这就会导致一个线程上的锁可能会被其他线程释放掉,这就和每个锁只能被获得锁的线程释放相互矛盾。这些有待后续进一步交流学习研究。
参考文档
http://redis.io/topics/distlock
作者:ITPSC
出处:http://www.cnblogs.com/hjwublog/
转载于:https://my.oschina.net/zyBourn/blog/1627687
- 点赞
- 收藏
- 分享
- 文章举报
- 分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)
- 分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)
- 分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)
- 分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)
- 分布式缓存技术redis学习系列(五)——spring-data-redis与JedisPool的区别、使用ShardedJedisPool与spring集成的实现及一致性哈希分析
- 分布式缓存技术redis学习系列(五)——spring-data-redis与JedisPool的区别、使用ShardedJedisPool与spring集成的实现及一致性哈希分析
- 分布式缓存技术redis学习系列(五)——spring-data-redis与JedisPool的区别、使用ShardedJedisPool与spring集成的实现及一致性哈希分析
- 10、Spring技术栈-整合Redis,使用RedisTemplate实现数据缓存实战
- 分布式缓存技术redis学习系列----深入理解Spring Redis的使用
- 分布式缓存技术redis学习系列----深入理解Spring Redis的使用
- 分布式缓存技术redis学习系列----深入理解Spring Redis的使用
- redis系列之5----redis实战(redis与spring整合,分布式锁实现)
- 分布式缓存技术redis学习系列(九)——Redis主从实现读写分离
- 分布式缓存技术redis学习系列(六)—— 深入理解Spring Redis的使用
- 分布式缓存技术redis学习系列----深入理解Spring Redis的使用
- 分布式缓存技术redis学习系列----深入理解Spring Redis的使用
- 技术文章 | 分布式缓存技术redis学习系列----深入理解Spring Redis的使用
- 分布式缓存技术redis学习系列(二)——详细讲解redis数据结构(内存模型)以及常用命令
- 分布式缓存技术redis学习系列(三)——redis高级应用(主从、事务与锁、持久化)
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)