您的位置:首页 > 数据库 > Redis

springboot2.x +redis使用和源码分析二(RedisTemplate)

2020-01-11 18:25 666 查看

目录

序言:本文讲述RedisTemplate对象如何构建以及该对象对于redis提供的功能的支持

1:定义RedisTemplate

2:基本使用Ddemo

3:RedisTemplate对Pipelined支持

4:RedisTemplate对事务支持

5:RedisTemplate对Lua语言支持

5.1:使用内置字符串形式

5.2:以文件的形式

5.3:使用Lua的一些应用场景

6:RedisTemplate对于redis中MQ功能支持

6.1:简单demo代码

6.2:走读MessageListenerAdapter源码

6.3:redis-MQ应用场景

 Demo代码:https://github.com/fangyuan94/redisDemo

6.2:走读MessageListenerAdapter源码

 Demo代码:https://github.com/fangyuan94/redisDemo

序言:本文讲述RedisTemplate对象如何构建以及该对象对于redis提供的功能的支持

在实际需求中我们会将用户的基础信息存放到redis作为缓存,在项目中我们定义PersonInfo用于存储用户信息

 

[code]@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PersonInfo implements Serializable {

private static final long serialVersionUID = -5666930682610937456L;

@NotNull
private String userId;

@NotNull
private String name;

@Max(100)
private Integer age;

@NotNull
private String sex;
}

如何优雅的操作person信息

1:定义RedisTemplate

[code]@Configuration
@AutoConfigureAfter(RedisCustomizerConfiguration.class)
public class RedisConfiguration {

@Bean
public RedisTemplate<String, PersonInfo> personInfoRedisTemplate(ObjectProvider<RedisConnectionFactory> redisConnectionFactory){

RedisTemplate<String, PersonInfo> personInfoRedisTemplate = new RedisTemplate<String, PersonInfo>();

personInfoRedisTemplate.setConnectionFactory(redisConnectionFactory.getObject());
//字符串序列化器
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//设置JDK序列化器
JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer(PersonInfo.class.getClassLoader());
//设置key value序列化器
personInfoRedisTemplate.setKeySerializer(stringRedisSerializer);
personInfoRedisTemplate.setValueSerializer(jdkSerializationRedisSerializer);
personInfoRedisTemplate.setHashKeySerializer(stringRedisSerializer);
personInfoRedisTemplate.setHashValueSerializer(jdkSerializationRedisSerializer);

return personInfoRedisTemplate;
}
}

2:基本使用Ddemo

[code]    //向redis中写数据(五种基本类型操作,和redis命令行操作基本一致)
@RequestMapping("addPersonInfo")
public Map<String,Object> addPersonInfo(){

String key = "personInfo";
PersonInfo personInfo = new PersonInfo();
personInfo.setUserId("1");
personInfo.setAge(18);
personInfo.setName("小花");
personInfo.setSex("女");
//操作string
personInfoRedisTemplate.opsForValue().set(key+"_str",personInfo);
//操作list
personInfoRedisTemplate.opsForList().leftPush(key+"list",personInfo);
//操作set
personInfoRedisTemplate.opsForSet().add(key+"_set",personInfo);
//操作有序set
personInfoRedisTemplate.opsForZSet().add(key+"_ZSet",personInfo,100);
//操作hash散列
personInfoRedisTemplate.opsForHash().put(key+"_Map",personInfo.getUserId(),personInfo);

Map<String,Object> map = new HashMap<>(3);
map.put("success",true);
map.put("data","1");
return map;
}

@RequestMapping("getPersonInfo")
public Map<String,Object> getPersonInfo(){

String key = "personInfo";

//操作string
PersonInfo personInfo1 = personInfoRedisTemplate.opsForValue().get(key+"_str");
//操作list
PersonInfo personInfo2 = personInfoRedisTemplate.opsForList().leftPop(key+"list");
//操作set
PersonInfo personInfo3 =personInfoRedisTemplate.opsForSet().pop(key+"_set");
//操作有序set
Set<PersonInfo> personInfos = personInfoRedisTemplate.opsForZSet().range(key+"_ZSet",0,100);
//操作hash散列
PersonInfo personInfo5 = (PersonInfo) personInfoRedisTemplate.opsForHash().get(key+"_Map","1");

Map<String,Object> map = new HashMap<>(3);
map.put("str",personInfo1);
map.put("list",personInfo2);
map.put("set",personInfo3);
map.put("zSet",personInfos);
map.put("hash",personInfo5);
return map;
}

3:RedisTemplate对Pipelined支持

使用redis中pipelined可以优化批量处理需求的性能,不过pipelined不具有原子性,当执行到某一条命令时失败时会丢弃此条命令

测试Demo:

[code]/**
* RedisTemplate 对于pipeline支持
* 提供SessionCallback与RedisCallback两种 作用一样
* @return
*/
@RequestMapping("pipelineTest")
public  Map<String,Object> pipelineTest(){

//测试数据
final List<PersonInfo> personInfosTest = new ArrayList<>();

for (int i=0;i<50;i++){
PersonInfo personInfo = new PersonInfo();
personInfo.setUserId(""+i);
personInfo.setAge(i);
personInfo.setName("小花"+i);
personInfo.setSex("女");
personInfosTest.add(personInfo);
}

final String key = "personInfo_pipeline_";

//SessionCallback 属于高级 代码书写非常友好
personInfoRedisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public  Object execute(RedisOperations operations) throws DataAccessException {
for (int i=0;i<personInfosTest.size();i++){

PersonInfo personInfo = personInfosTest.get(i);
int j = i%5;
//使用不同命令
if(j==0){
operations.opsForValue().set(key+"_str",personInfo);
}else if(j==1){
operations.opsForHash().put(key+"_Map",personInfo.getUserId(),personInfo);
}else if(j==2){
operations.opsForList().leftPush(key+"list",personInfo);
}else if(j==3){
operations.opsForZSet().add(key+"_ZSet",personInfo,100);
}else {
operations.opsForSet().add(key+"_set",personInfo);
}
}
return null;
}
},personInfoRedisTemplate.getValueSerializer());

//RedisCallback偏底层些 处理byt[]类型
List<Object> ts = personInfoRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {

for (int i=0;i<personInfosTest.size();i++){

PersonInfo personInfo = personInfosTest.get(i);
int j = i%5;

RedisSerializer stringSerializer = personInfoRedisTemplate.getKeySerializer();

RedisSerializer valueSerializer = personInfoRedisTemplate.getValueSerializer();
//使用不同命令
if(j==0){
connection.set(stringSerializer.serialize(key+"str"),valueSerializer.serialize(personInfo));
}else if(j==1){
connection.hSet(stringSerializer.serialize(key+"hash"),stringSerializer.serialize(personInfo.getUserId()),valueSerializer.serialize(personInfo));
}else if(j==2){
connection.lPush(stringSerializer.serialize(key+"list"),valueSerializer.serialize(personInfo));
}else if(j==3){
connection.zAdd(stringSerializer.serialize(key+"zSet"),100,valueSerializer.serialize(personInfo));
}else {
connection.sAdd(stringSerializer.serialize(key+"set"),valueSerializer.serialize(personInfo));
}
}
//返回结果必须为null
return null;
}
},personInfoRedisTemplate.getValueSerializer());

Map<String,Object> map = new HashMap<>(3);
map.put("success",true);
map.put("data",ts);

return map;
}

查看结果可以看到操作不同命令返回结果是不一样:

[code]"data": [
true,
true,
1,
true,
1,
true,
true,
2,
true,
1,
。。。。。
],

4:RedisTemplate对事务支持

redis中的事务本质是一组命令的集合。在事务执行的过程会保证其它客户端提交的命令插入到当前序列中(排它性),这组命令会顺序执行,当个命令会是原子性的,但整组命令并不是原子性的,某个命令的失败(如果是错误是命令性错误类似于java编译性错误,则整个命令都不会执行)不会影响其它命令的执行也不会对之前执行的命令进行回滚。

redis提供了5组命令(

MULTI
EXEC
DISCARD
WATCH
UNWATCH)
对于事务支持

对于RedisTemplate中使用execute(SessionCallback)方法支持

demo代码:

[code]@RequestMapping("transactionalTest")
public  Map<String,Object> transactionalTest(){

//这里模拟用户取钱的场景 初始银行是10000 用户是10 每次用户取10
//这里通过压测工具模拟100个并发 5000个请求是否能保证数据的一致性
final String bank_key = "transactional_bank"; //默认为10000
final String person_key = "transactional_person";//默认为100
List<String> watchs = new ArrayList<>();
watchs.add(bank_key);
watchs.add(person_key);
//执行事务
personInfoRedisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
List rs;
do {
//监控需要变化key
operations.watch(watchs);
//对应multi命令
operations.multi();
operations.opsForValue().decrement(bank_key,10);
//需要执行命令
operations.opsForValue().increment(person_key,10);
//提交
rs = operations.exec();

System.out.println(rs);

//和CAS中自旋概念类似
}while (rs!=null && rs.size()==0);

return null;
}
});

Map<String,Object> map = new HashMap<>(3);
map.put("success",true);
map.put("data",1);

return map;
}

这里使用了简单的压测工具ab来进行测试: ab -n 5000 -c 100 'http://host.docker.internal:18670/transactionalTest'

最终结果为:

在100个并发情况下共5000个请求平均耗时为: 

5:RedisTemplate对Lua语言支持

上述说到了redis中的Pipelined和事务都支持多组命令一起执行但是使用Pipelined不保证整体命令的原子性,事务在某些条件下也不包含原子性。在redis2.8之后引入了新的方式可以解决此类问题——Lua,它在执行的时候是不会被中断的,整体具有原子性。因为它的这种特性,一般将它用于高并发的场景下。(对Lua语言不了解的可以现百度了解下这门轻型的脚本语言)

5.1:使用内置字符串形式

[code]     /**
* redis对于Lua支持
* @return
*/
@RequestMapping("LuaTest")
public  Map<String,Object> LuaTest(){

//创建脚本
DefaultRedisScript<String> defaultRedisScript = new DefaultRedisScript();

//redis.call  redis执行命令 KEYS代表redis中key ARGV代表参数 return指需要返回数据
String luaText = "redis.call('set',KEYS[1],ARGV[1]) return redis.call('get',KEYS[1])";
//设置脚本内容
defaultRedisScript.setScriptText(luaText);
//也可以通过将lue写入文件方式进行调用易于维护
//        defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/demo.lua")));
//设置脚本数据返回类型
defaultRedisScript.setResultType(String.class);
//执行命令
//第一个代表的需要执行的脚本
//第二对应着lua中的KEYS信息 KEYS下标从1开始
//第三个是对应ARGV中指 ARGV下标从1开始
String rs = stringRedisTemplate.execute(defaultRedisScript,Collections.singletonList("lua_test"),"测试lua数据");

Map<String,Object> map = new HashMap<>(3);
map.put("success",true);
map.put("data",rs);

return map;
}

返回结果:

[code]{
"data": "测试lua数据",
"success": true
}

5.2:以文件的形式

如果需要执行的逻辑较复杂,此时我们可以通过将lua写入文件的方式进行处理例如(去银行存钱)。

需求:某银行现余额10000元,现存在一批人(100)向银行存储共8000次,每次一元,使用lua脚本是否可以保证数据一致性

代码:

[code]/**
* 测试
* @return
*/
@RequestMapping("LuaFileTest")
public  Map<String,Object> LuaFileTest(){

//创建脚本
DefaultRedisScript<String> defaultRedisScript = new DefaultRedisScript();

//也可以通过将lue写入文件方式进行调用易于维护
defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/demo.lua")));
//设置脚本数据返回类型
defaultRedisScript.setResultType(String.class);
//redis 需要操作key
List<String> keys = new ArrayList<>();
keys.add("lua_bank");
keys.add("lua_person");
//
Object[] args = new Object[]{"10000","10000","1"} ;
//执行命令
//第一个代表的需要执行的脚本
//第二对应着lua中的KEYS信息 KEYS下标从1开始
//第三个是对应ARGV中指 ARGV下标从1开始
String rs = stringRedisTemplate.execute(defaultRedisScript,keys,args);

System.out.println("-------结果集-----"+rs);

Map<String,Object> map = new HashMap<>(3);
map.put("success",true);
map.put("data",rs);

return map;
}

一样使用ab压测进行模拟测试: ab -n 8000 -c 100 'http://host.docker.internal:18670/LuaFileTest'

最终结果与预期一致,可以保证数据的一致性:

在100个并发情况下共8000个请求平均耗时为:  

整体的吞吐量高于事务

5.3:使用Lua的一些应用场景

场景一:使用redis构建分布式锁

场景二:针对于电商中秒杀场景下保证数据一致性

假设某电商中某店家为某一个产品做秒杀,每一个用户只能在规定时间内抢夺总共50个商品,且每人限购一份,如何在高并发下保证商品的数量不会出现溢出的现象,这里使用redis结合lue来实现。

java代码:

[code]/**
* 高并发下秒杀商品代码实现
* @param userId 这里是模拟代码 所以userId 以参数的形式
* @return
*/
@RequestMapping("seckillMerchandise")
public  Map<String,Object> seckillMerchandise(@RequestParam("userId") String userId){

//创建脚本
DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript();

//也可以通过将lue写入文件方式进行调用易于维护
defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/seckillMerchandise.lua")));
//设置脚本数据返回类型
defaultRedisScript.setResultType(Long.class);
//redis 需要操作key
List<String> keys = new ArrayList<>();
//记录商品总数 这里测试数据20个
keys.add("seckill_merchandise_count"+"_spid");
//记录抢购到的用户记录
keys.add("seckill_merchandise_user"+"_spid");

//用户id
Object[] args = new Object[]{userId,System.currentTimeMillis()+""} ;
//执行命令
//第一个代表的需要执行的脚本
//第二对应着lua中的KEYS信息 KEYS下标从1开始
//第三个是对应ARGV中指 ARGV下标从1开始
Long rs = stringRedisTemplate.execute(defaultRedisScript,keys,args);
//根据不同结果进行不同处理 ---
System.out.println("-------结果集-----:"+rs);

Map<String,Object> map = new HashMap<>(3);
map.put("success",true);
map.put("data",rs);

return map;
}

lua代码:

[code]--秒杀商品lua代码
--返回状态:
-- 0 当前商品已售空
-- -1 该用户已经抢购了该商品了
-- -2 代表存储秒杀商品总数的key不存在 非法数据无法处理或者抢购时间已结束
-- 1代表正常返回
--用户key
local userId = ARGV[1]
--存储秒杀商品总数的key 该值在设定抢购开始时写入缓存 并设置过期时间
local seckillMerchandiseCountKey = tostring(KEYS[1])
--目前商品剩余总数
local count =  redis.call('GET',seckillMerchandiseCountKey)

if count == false then
--代表存储秒杀商品总数的key不存在 非法数据无法处理或者抢购时间已结束
return -2
end
--转换为数值类型
count = tonumber(count)
if count  <= 0 then
--代表当前商品已售空
return 0
end

--记录已抢够成功用户列表 用于判断用户是否抢购
--因为秒杀活动不是针对于一种商品 所以这里以商品id+一些唯一标示组合为key value为hash散列
local seckillMerchandiseUserKey = tostring(KEYS[2])

local flag = redis.call('HGET',seckillMerchandiseUserKey,userId)
if flag == true then
--该用户已经抢购了该商品了
return -1
end
--该用户抢购了该商品了 需要对数据进行变更
--商品 -1
redis.call('DECRBY',seckillMerchandiseCountKey,1)
--将用户写入 并记录抢购成功时间
redis.call('HSET',seckillMerchandiseUserKey,userId,ARGV[2])
return 1

模拟测试代码:

[code]#!/bin/bash
###使用ab构建测试脚本 因为博主用的是mac电脑 使用docker 运行ab的所以这里主机名用/、host.docker.internal
for (( i = 0; i < 100; i++ )); do
ab -n 3 -c 1 'http://host.docker.internal:18670/seckillMerchandise?userId=userId_'+i
done

最终结果(在100个并发下共300个请求下并不存在超卖的情况):

其实上述只是描述下lua的使用的一些业务场景,真正的秒杀业务设计没这么简单。后续会针对于秒杀系统讲述自己的一些想法

6:RedisTemplate对于redis中MQ功能支持

redis提供轻量级的mq功能

6.1:简单demo代码

[code]/**
* 用于初始化依赖类
* @author fangyuan
*/
@Configuration
public class RedisListenerConfiguration {

/**
* MessageListenerAdapter此类需要被注入到spring中执行
* 因为MessageListenerAdapter实现了InitializingBean接口 执行afterPropertiesSet()
* @return
*/
@Bean
public MessageListenerAdapter messageListenerAdapter(){
//将代理类注入到MessageListenerAdapter 代理执行
return new MessageListenerAdapter(new Recevetor(),"receve");
}

/**
* 创建监听容器 用于管理监听器
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
StringRedisSerializer stringRedisSerializer,
MessageListenerAdapter messageListenerAdapter,
ObjectProvider<RedisConnectionFactory> redisConnectionFactory){

RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
//设置基础配置
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory.getObject());
redisMessageListenerContainer.setBeanName("redisMessageListenerContainer");
redisMessageListenerContainer.setTopicSerializer(stringRedisSerializer);
//创建多监听者 监听不同topic
//使用自定义监听方式
MessageListener messageListener =  new ConsumerMessageListener();
redisMessageListenerContainer.addMessageListener(messageListener,new  ChannelTopic("redis_mq_test_1"));
//使用适配器方式(通过代理执行设计模式+反射来做到)
messageListenerAdapter.setStringSerializer(stringRedisSerializer);
messageListenerAdapter.setSerializer(stringRedisSerializer);
redisMessageListenerContainer.addMessageListener(messageListenerAdapter,new ChannelTopic("redis_mq_test_2"));

return redisMessageListenerContainer;
}
}

定义两种接受方式:

[code]/**
* 监听消息
* @author fangyuan
*/
public class ConsumerMessageListener implements MessageListener {

private StringRedisSerializer stringRedisSerializer;

public  ConsumerMessageListener(){
stringRedisSerializer = new StringRedisSerializer();
}

@Override
public void onMessage(Message message, byte[] pattern) {

String data = stringRedisSerializer.deserialize(message.getBody());

String channel = stringRedisSerializer.deserialize(message.getChannel());
System.out.println("从"+channel+"中---收到消息----:"+data);

}
}

 

[code]
/**
* 定义被MessageListenerAdapter被代理执行类
* @author fangyuan
*/
public class Recevetor {

/**
* 接受message 此方法的参数只能为两个参数message,topic) 或者是一个参数(message)
* @param message
*/
public void receve(String message,String channel){

System.out.println("通过MessageListenerAdapter代理方式从"+channel+"中---收到消息----:"+message);
}
}

测试消息类:

[code]/**
* 发送信息到mq
* @return
*/
@RequestMapping("produceMessage")
public  Map<String,Object> produceMessage(){

String channel = "redis_mq_test_1";

int count = atomicInteger.incrementAndGet();

if(count%2==0){
channel = "redis_mq_test_2";
}

String message = "测试使用redis作为MQ发送数据__"+count;
//发布信息
stringRedisTemplate.convertAndSend(channel,message);

Map<String,Object> map = new HashMap<>(3);
map.put("success",true);
map.put("data",1);

return map;
}

 最终结果:

6.2:走读MessageListenerAdapter源码

此类通过代理设计模式+反射调用来实现需求

[code]//此类需要被注入到spring中 因为实现了该接口InitializingBean 需要初始化MethodInvoker对象
public void afterPropertiesSet() {
String methodName = getDefaultListenerMethod();

if (!StringUtils.hasText(methodName)) {
throw new InvalidDataAccessApiUsageException("No default listener method specified: "
+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
+ "override the 'getListenerMethodName' method.");
}

invoker = new MethodInvoker(delegate, methodName);
}

//实现MessageListener接口
public void onMessage(Message message, @Nullable byte[] pattern) {
try {

//判断代理类本质是否是MessageListener类如果是 强制转换下调用即可
if (delegate != this) {
if (delegate instanceof MessageListener) {
((MessageListener) delegate).onMessage(message, pattern);
return;
}
}

//获取message 和channel信息
Object convertedMessage = extractMessage(message);
String convertedChannel = stringSerializer.deserialize(pattern);
// 组装成Invoke调用方法参数形式
Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };
//通过反射 调用代理类指定方法
invokeListenerMethod(invoker.getMethodName(), listenerArguments);
} catch (Throwable th) {
handleListenerException(th);
}
}

6.3:redis-MQ应用场景

redis-MQ作为一个轻量型mq,它的应用场景是和redis本质有很大关系,因为redis的核心技术在于内存的使用,那么代表着redis-MQ的实时性很高;但也带来着对应的缺点,例如无法存储大量信息(内存资源宝贵);信息的可用性难保证(第一种情况:消息被发送,如果没有订阅者接收,那么消息就会丢失;第二种情况:redis挂掉此条消息存在丢失数据可能);不支持断点消费;每个消费者会消费所有数据(无法支持组消费或者说不支持负载均衡消费,会存在单点压力过大)。所以针对需求技术选型的时候需要考虑上述问题。

 Demo代码:https://github.com/fangyuan94/redisDemo

  • 点赞 1
  • 收藏
  • 分享
  • 文章举报
F_Hello_World 发布了38 篇原创文章 · 获赞 47 · 访问量 1044 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: