您的位置:首页 > 数据库 > Redis

分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)...

2020-01-11 15:29 549 查看

Redis与spring的整合

相关依赖jar包

spring把专门的数据操作独立封装在spring-data系列中,spring-data-redis是对Redis的封装

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

<dependency>

    
<groupId>org.springframework.data</groupId>

    
<artifactId>spring-data-redis</artifactId>

    
<version>
1.4
.
2
.RELEASE</version>

</dependency>

 

<dependency>

    
<groupId>redis.clients</groupId>

    
<artifactId>jedis</artifactId>

    
<version>
2.6
.
2
</version>

</dependency>

 

<dependency>

    
<groupId>org.apache.commons</groupId>

    
<artifactId>commons-pool2</artifactId>

    
<version>
2.4
.
2
</version>

</dependency>

Spring 配置文件applicationContext.xml

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

<!--命令空间中加入下面这行-->

xmlns:p=
"http://www.springframework.org/schema/p"

 

<!-- redis连接池配置文件 -->

<context:property-placeholder location=
"classpath:redis.properties"
 
/>  

 

<bean id=
"poolConfig"
 
class
=
"redis.clients.jedis.JedisPoolConfig"
>  

    
<property name=
"maxIdle"
 
value=
"${redis.maxIdle}"
 
/>  

    
<property name=
"maxTotal"
 
value=
"${redis.maxTotal}"
 
/>  

    
<property name=
"MaxWaitMillis"
 
value=
"${redis.MaxWaitMillis}"
 
/>  

    
<property name=
"testOnBorrow"
 
value=
"${redis.testOnBorrow}"
 
/>  

</bean>  

  
 

<bean id=
"connectionFactory"
 
class
=
"org.springframework.data.    redis.connection.jedis.JedisConnectionFactory"
 

    
p:host-name=
"${redis.host}"
 
p:port=
"${redis.port}"

    
p:password=
"${redis.pass}"
  
p:pool-config-ref=
"poolConfig"
/>  

  
 

<bean id=
"redisTemplate"
 
class
=
"org.springframework.data.    redis.core.RedisTemplate"
>  

    
<property name=
"connectionFactory"
   
ref=
"connectionFactory"
 
/>  

</bean>

  

注意新版的maxTotal,MaxWaitMillis这两个字段与旧版的不同。

redis连接池配置文件redis.properties

1

2

3

4

5

6

7

8

redis.host=
192.168
.
2.129

redis.port=
6379
 

redis.pass=redis129  

 

redis.maxIdle=
300
 

redis.maxTotal=
600
 

redis.MaxWaitMillis=
1000
 

redis.testOnBorrow=
true

好了,配置完成,下面写上代码

回到顶部

测试代码

User

1

2

3

4

5

6

7

8

9

@Entity

@Table
(name = 
"t_user"
)

public
 
class
 
User {

    
//主键

    
private
 
String id;

    
//用户名

    
private
 
String userName;

        
//...省略get,set...

}

BaseRedisDao

1

2

3

4

5

6

7

@Repository

public
 
abstract
 
class
 
BaseRedisDao<K,V> {

    
 

    
@Autowired
(required=
true
)  

    
protected
 
RedisTemplate<K, V> redisTemplate;

 

}

IUserDao 

1

2

3

4

5

6

7

8

9

10

11

public
 
interface
 
IUserDao {

    
 

    
public
 
boolean
 
save(User user);

    
 

    
public
 
boolean
 
update(User user);

 

    
public
 
boolean
 
delete(String userIds);

    
 

    
public
 
User find(String userId);

    
 

}

UserDao 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

@Repository

public
 
class
 
UserDao 
extends
 
BaseRedisDao<String, User> 
implements
 
IUserDao {

    
 

    
@Override

    
public
 
boolean
 
save(
final
 
User user) {

        
boolean
 
res = redisTemplate.execute(
new
 
RedisCallback<Boolean>() {

            
public
 
Boolean doInRedis(RedisConnection connection) 
throws
 
DataAccessException {

                
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();

                
byte
[] key = serializer.serialize(user.getId());

                
byte
[] value = serializer.serialize(user.getUserName());

                
//set not exits

                
return
 
connection.setNX(key, value);

            
}

        
});

        
return
 
res;

    
}

 

    
@Override

    
public
 
boolean
 
update(
final
 
User user) {

        
boolean
 
result = redisTemplate.execute(
new
 
RedisCallback<Boolean>() {  

            
public
 
Boolean doInRedis(RedisConnection connection) 
throws
 
DataAccessException {  

                
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();  

                
byte
[] key  = serializer.serialize(user.getId());  

                
byte
[] name = serializer.serialize(user.getUserName());  

                
//set

                
connection.set(key, name);  

                
return
 
true
;  

            
}  

        
});  

        
return
 
result;

    
}

 

    
@Override

    
public
 
User find(
final
 
String userId) {

        
User result = redisTemplate.execute(
new
 
RedisCallback<User>() {  

            
public
 
User doInRedis(RedisConnection connection) 
throws
 
DataAccessException {  

                
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();  

                
byte
[] key = serializer.serialize(userId);

                
//get

                
byte
[] value = connection.get(key);  

                
if
 
(value == 
null
) {  

                    
return
 
null
;  

                
}  

                
String name = serializer.deserialize(value); 

                
User resUser = 
new
 
User();

                
resUser.setId(userId);

                
resUser.setUserName(name);

                
return
 
resUser;  

            
}  

        
});  

        
return
 
result;  

    
}

 

    
@Override

    
public
 
boolean
 
delete(
final
 
String userId) {

        
boolean
 
result = redisTemplate.execute(
new
 
RedisCallback<Boolean>() {  

            
public
 
Boolean doInRedis(RedisConnection connection) 
throws
 
DataAccessException {  

                
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();  

                
byte
[] key  = serializer.serialize(userId);  

                
//delete 

                
connection.del(key);

                
return
 
true
;  

            
}  

        
});  

        
return
 
result;

    
}

 

}

Test

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

@RunWith
(SpringJUnit4ClassRunner.
class
)

@ContextConfiguration
(locations = {
"classpath*:applicationContext.xml"
})  

public
 
class
 
RedisTest 
extends
 
AbstractJUnit4SpringContextTests {  

      
 

    
@Autowired
 

    
private
 
IUserDao userDao;

    
 

    
@Test
 

    
public
 
void
 
testSaveUser() {  

        
User user = 
new
 
User();  

        
user.setId(
"402891815170e8de015170f6520b0000"
);  

        
user.setUserName(
"zhangsan"
);  

        
boolean
 
res = userDao.save(user);

        
Assert.assertTrue(res);  

    
}  

    
 

    
@Test
 

    
public
 
void
 
testGetUser() {  

        
User user = 
new
 
User();  

        
user = userDao.find(
"402891815170e8de015170f6520b0000"
);

        
System.out.println(user.getId() + 
"-"
 
+ user.getUserName() );  

    
}  

    
 

    
@Test
 

    
public
 
void
 
testUpdateUser() {  

        
User user = 
new
 
User();  

        
user.setId(
"402891815170e8de015170f6520b0000"
);  

        
user.setUserName(
"lisi"
);  

        
boolean
 
res = userDao.update(user);

        
Assert.assertTrue(res);  

    
}  

    
 

    
@Test
 

    
public
 
void
 
testDeleteUser() {  

        
boolean
 
res = userDao.delete(
"402891815170e8de015170f6520b0000"
);

        
Assert.assertTrue(res);  

    
}  

   
 

}

  

String类型的增删该查已完成,Hash,List,Set数据类型的操作就不举例了,和使用命令的方式差不多。如下

 

1

2

3

4

5

6

7

8

9

10

11

12

13

connection.hSetNX(key, field, value);

connection.hDel(key, fields);

connection.hGet(key, field);

 

connection.lPop(key);

connection.lPush(key, value);

connection.rPop(key);

connection.rPush(key, values);

 

connection.sAdd(key, values);

connection.sMembers(key);

connection.sDiff(keys);

connection.sPop(key);

  

回到顶部

整合可能遇到的问题

1.NoSuchMethodError

1

2

3

java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V

 

Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V

  

类似找不到类,找不到方法的问题,当确定依赖的jar已经引入之后,此类问题多事spring-data-redis以及jedis版本问题,多换个版本试试,本文上面提到的版本可以使用。

1.No qualifying bean

1

No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found 
for
 
dependency

  

找不到bean,考虑applicationContext.xml中配置redisTemplate bean时实现类是否写错。例如,BaseRedisDao注入的是RedisTemplate类型的对象,applicationContext.xml中配置的实现类却是RedisTemplate的子类StringRedisTemplate,那肯定报错。整合好后,下面我们着重学习基于redis的分布式锁的实现。

回到顶部

基于redis实现的分布式锁

我们知道,在多线程环境中,锁是实现共享资源互斥访问的重要机制,以保证任何时刻只有一个线程在访问共享资源。锁的基本原理是:用一个状态值表示锁,对锁的占用和释放通过状态值来标识,因此基于redis实现的分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁,DEL相当于释放锁,当然,在下面的具体实现中会更复杂些。之所以称为分布式锁,是因为客户端可以在redis集群环境中向集群中任一个可用Master节点请求上锁(即SETNX命令存储key到redis缓存中是随机的)。

 

现在相信你已经对在基于redis实现的分布式锁的基本概念有了解,需要注意的是,这个和前面文章提到的使用WATCH 命令对key值进行锁操作没有直接的关系。java中synchronized和Lock对象都能对共享资源进行加锁,下面我们将学习用java实现的redis分布式锁。

java中的锁技术

在分析java实现的redis分布式锁之前,我们先来回顾下java中的锁技术,为了直观的展示,我们采用“多个线程共享输出设备”来举例。

不加锁共享输出设备

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

public
 
class
 
LockTest {

    
//不加锁

    
static
 
class
 
Outputer {

        
public
 
void
 
output(String name) {

            
for
(
int
 
i=
0
; i<name.length(); i++) {

                
System.out.print(name.charAt(i));

            
}

            
System.out.println();

        
}

    
}

    
public
 
static
 
void
 
main(String[] args) {

        
final
 
Outputer output = 
new
 
Outputer();

        
//线程1打印zhangsan

        
new
 
Thread(
new
 
Runnable(){

            
@Override

            
public
 
void
 
run() {

                
while
(
true
) {

                     
try
{

                         
Thread.sleep(
1000
);

                     
}
catch
(InterruptedException e) {

                         
e.printStackTrace();

                     
}

                     
output.output(
"zhangsan"
);

                
}   

            
}

        
}).start();

        
 

        
//线程2打印lingsi

        
new
 
Thread(
new
 
Runnable(){

            
@Override

            
public
 
void
 
run() {

                
while
(
true
) {

                     
try
{

                         
Thread.sleep(
1000
);

                     
}
catch
(InterruptedException e) {

                         
e.printStackTrace();

                     
}

                     
output.output(
"lingsi"
);

                
}

            
}

        
}).start();

        
 

        
//线程3打印wangwu

        
new
 
Thread(
new
 
Runnable(){

            
@Override

            
public
 
void
 
run() {

                
while
(
true
) {

                     
try
{

                         
Thread.sleep(
1000
);

                     
}
catch
(InterruptedException e) {

                         
e.printStackTrace();

                     
}

                     
output.output(
"huangwu"
);

                
}

            
}

        
}).start();

    
}

}

 

上面例子中,三个线程同时共享输出设备output,线程1需要打印zhangsan,线程2需要打印lingsi,线程3需要打印wangwu。在不加锁的情况,这三个线程会不会因为得不到输出设备output打架呢,我们来看看运行结果:

 

1

2

3

4

5

6

7

8

9

10

11

huangwu

zhangslingsi

an

huangwu

zlingsi

hangsan

huangwu

lzhangsan

ingsi

huangwu

lingsi

  

从运行结果可以看出,三个线程打架了,线程1没打印完zhangsan,线程2就来抢输出设备......可见,这不是我们想要的,我们想要的是线程之间能有序的工作,各个线程之间互斥的使用输出设备output。

使用java5中的Lock对输出设备加锁

现在我们对Outputer进行改进,给它加上锁,加锁之后每次只有一个线程能访问它。

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

//使用java5中的锁

static
 
class
 
Outputer{

    
Lock lock = 
new
 
ReentrantLock();

    
public
 
void
 
output(String name) {

        
//传统java加锁

        
//synchronized (Outputer.class){

        
lock.lock();

        
try
 
{

            
for
(
int
 
i=
0
; i<name.length(); i++) {

                
System.out.print(name.charAt(i));

            
}

            
System.out.println();

        
}
finally
{

            
//任何情况下都有释放锁

            
lock.unlock();

        
}   

        
//}

    
}

}

  

看看加锁后的输出结果:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

zhangsan

lingsi

huangwu

zhangsan

lingsi

huangwu

zhangsan

lingsi

huangwu

zhangsan

lingsi

huangwu

zhangsan

lingsi

huangwu

......

  

从运行结果中可以看出,三个线程之间不打架了,线程之间的打印变得有序。有个这个基础,下面我们来学习基于Redis实现的分布式锁就更容易了。

Redis分布式锁

实现分析

从上面java锁的使用中可以看出,锁对象主要有lock与unlock方法,在lock与unlock方法之间的代码(临界区)能保证线程互斥访问。基于redis实现的Java分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁(lock),DEL相当于释放锁(unlock)。我们只要实现Lock接口重写lock()和unlock()即可。但是这还不够,安全可靠的分布式锁应该满足满足下面三个条件:

l 互斥,不管任何时候,只有一个客户端能持有同一个锁。

l 不会死锁,最终一定会得到锁,即使持有锁的客户端对应的master节点宕掉。

l 容错,只要大多数Redis节点正常工作,客户端应该都能获取和释放锁。

那么什么情况下回不满足上面三个条件呢。多个线程(客户端)同时竞争锁可能会导致多个客户端同时拥有锁。比如,

(1)线程1在master节点拿到了锁(存入key)

(2)master节点在把线程1创建的key写入slave之前宕机了,此时集群中的节点已经没有锁(key)了,包括master节点的slaver节点

(3)slaver节点升级为master节点

(4)线程2向新的master节点发起锁(存入key)请求,很明显,能请求成功。

可见,线程1和线程2同时获得了锁。如果在更高并发的情况,可能会有更多线程(客户端)获取锁,这种情况就会导致上文所说的线程“打架”问题,线程之间的执行杂乱无章。

 

那什么情况下又会发生死锁的情况呢。如果拥有锁的线程(客户端)长时间的执行或者因为某种原因造成阻塞,就会导致锁无法释放(unlock没有调用),其它线程就不能获取锁而而产生无限期死锁的情况。其它线程在执行lock失败后即使粗暴的执行unlock删除key之后也不能正常释放锁,因为锁就只能由获得锁的线程释放,锁不能正常释放其它线程仍然获取不到锁。解决死锁的最好方式是设置锁的有效时间(redis的expire命令),不管是什么原因导致的死锁,有效时间过后,锁将会被自动释放。

 

为了保障容错功能,即只要有Redis节点正常工作,客户端应该都能获取和释放锁,我们必须用相同的key不断循环向Master节点请求锁,当请求时间超过设定的超时时间则放弃请求锁,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,应该尽快尝试下一个master节点。释放锁比较简单,因为只需要在所有节点都释放锁就行,不管之前有没有在该节点获取锁成功。

Redlock算法

根据上面的分析,官方提出了一种用Redis实现分布式锁的算法,这个算法称为RedLock。RedLock算法的主要流程如下:

 

RedLock算法主要流程

 

 

Java实现

 

结合上面的流程图,加上下面的代码解释,相信你一定能理解redis分布式锁的实现原理

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

public
 
class
 
RedisLock 
implements
 
Lock{

 

    
protected
 
StringRedisTemplate redisStringTemplate;

 

    
// 存储到redis中的锁标志

    
private
 
static
 
final
 
String LOCKED = 
"LOCKED"
;

 

    
// 请求锁的超时时间(ms)

    
private
 
static
 
final
 
long
 
TIME_OUT = 
30000
;

 

    
// 锁的有效时间(s)

    
public
 
static
 
final
 
int
 
EXPIRE = 
60
;

 

    
// 锁标志对应的key;

    
private
 
String key;

 

    
// state flag

    
private
 
volatile
 
boolean
 
isLocked = 
false
;

 

    
public
 
RedisLock(String key) {

        
this
.key = key;

        
@SuppressWarnings
(
"resource"
)

        
ApplicationContext  ctx =  
new
 
ClassPathXmlApplicationContext(
"classpath*:applicationContext.xml"
);

        
redisStringTemplate = (StringRedisTemplate)ctx.getBean(
"redisStringTemplate"
);

    
}

 

    
@Override

    
public
 
void
 
lock() {

        
//系统当前时间,毫秒

        
long
 
nowTime = System.nanoTime();

        
//请求锁超时时间,毫秒

        
long
 
timeout = TIME_OUT*
1000000
;

        
final
 
Random r = 
new
 
Random();

        
try
 
{

            
//不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁

            
//这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间

            
//如果一个master节点不可用了,应该尽快尝试下一个master节点

            
while
 
((System.nanoTime() - nowTime) < timeout) {

                
//将锁作为key存储到redis缓存中,存储成功则获得锁

                
if
 
(redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),

                        
LOCKED.getBytes())) {

                    
//设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间

                    
//可以防止因异常情况无法释放锁而造成死锁情况的发生

                    
redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);

                    
isLocked = 
true
;

                    
//上锁成功结束请求

                    
break
;

                
}

                
//获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现

                
//睡眠3毫秒后继续请求锁

                
Thread.sleep(
3
, r.nextInt(
500
));

            
}

        
catch
 
(Exception e) {

            
e.printStackTrace();

        
}

    
}

 

    
@Override

    
public
 
void
 
unlock() {

        
//释放锁

        
//不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作

        
if
 
(isLocked) {

            
redisStringTemplate.delete(key);

        
}

    
}

 

    
@Override

    
public
 
void
 
lockInterruptibly() 
throws
 
InterruptedException {

        
// TODO Auto-generated method stub

        
 

    
}

 

    
@Override

    
public
 
boolean
 
tryLock() {

        
// TODO Auto-generated method stub

        
return
 
false
;

    
}

 

    
@Override

    
public
 
boolean
 
tryLock(
long
 
time, TimeUnit unit) 
throws
 
InterruptedException {

        
// TODO Auto-generated method stub

        
return
 
false
;

    
}

 

    
@Override

    
public
 
Condition newCondition() {

        
// TODO Auto-generated method stub

        
return
 
null
;

    
}

}

 

好了,RedisLock已经实现,我们对Outputer使用RedisLock进行修改

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

/使用RedisLock

static
 
class
 
Outputer {

    
//创建一个名为redisLock的RedisLock类型的锁

    
RedisLock redisLock = 
new
 
RedisLock(
"redisLock"
);

    
public
 
void
 
output(String name) {

        
//上锁

        
redisLock.lock();

        
try
 
{

            
for
(
int
 
i=
0
; i<name.length(); i++) {

                
System.out.print(name.charAt(i));

            
}

            
System.out.println();

        
}
finally
{

            
//任何情况下都要释放锁

            
redisLock.unlock();

        
}   

    
}

}

  

看看使用RedisLock加锁后的的运行结果

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

lingsi

zhangsan

huangwu

lingsi

zhangsan

huangwu

lingsi

zhangsan

huangwu

lingsi

zhangsan

huangwu

lingsi

zhangsan

huangwu

......

  

可见,使用RedisLock加锁后线程之间不再“打架”,三个线程互斥的访问output。

问题

现在我无法论证RedLock算法在分布式、高并发环境下的可靠性,但从本例三个线程的运行结果看,RedLock算法确实保证了三个线程互斥的访问output(redis.maxIdle=300 redis.maxTotal=600,运行到Timeout waiting for idle object都没有出现线程“打架”的问题)。我认为RedLock算法仍有些问题没说清楚,比如,如何防止宕机时多个线程同时获得锁;RedLock算法在释放锁的处理上,不管线程是否获取锁成功,只要上了锁,就会到每个master节点上释放锁,这就会导致一个线程上的锁可能会被其他线程释放掉,这就和每个锁只能被获得锁的线程释放相互矛盾。这些有待后续进一步交流学习研究。

 

回到顶部

参考文档

http://redis.io/topics/distlock

http://ifeve.com/redis-lock/

 



 

作者:ITPSC
出处:http://www.cnblogs.com/hjwublog/

转载于:https://my.oschina.net/zyBourn/blog/1627687

  • 点赞
  • 收藏
  • 分享
  • 文章举报
chennieg27434 发布了0 篇原创文章 · 获赞 0 · 访问量 227 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐