springboot2.x +redis使用和源码分析二(RedisTemplate)
目录
序言:本文讲述RedisTemplate对象如何构建以及该对象对于redis提供的功能的支持
6.2:走读MessageListenerAdapter源码
Demo代码:https://github.com/fangyuan94/redisDemo
6.2:走读MessageListenerAdapter源码
Demo代码:https://github.com/fangyuan94/redisDemo
序言:本文讲述RedisTemplate对象如何构建以及该对象对于redis提供的功能的支持
在实际需求中我们会将用户的基础信息存放到redis作为缓存,在项目中我们定义PersonInfo用于存储用户信息
[code]@Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor public class PersonInfo implements Serializable { private static final long serialVersionUID = -5666930682610937456L; @NotNull private String userId; @NotNull private String name; @Max(100) private Integer age; @NotNull private String sex; }
如何优雅的操作person信息
1:定义RedisTemplate
[code]@Configuration @AutoConfigureAfter(RedisCustomizerConfiguration.class) public class RedisConfiguration { @Bean public RedisTemplate<String, PersonInfo> personInfoRedisTemplate(ObjectProvider<RedisConnectionFactory> redisConnectionFactory){ RedisTemplate<String, PersonInfo> personInfoRedisTemplate = new RedisTemplate<String, PersonInfo>(); personInfoRedisTemplate.setConnectionFactory(redisConnectionFactory.getObject()); //字符串序列化器 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); //设置JDK序列化器 JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer(PersonInfo.class.getClassLoader()); //设置key value序列化器 personInfoRedisTemplate.setKeySerializer(stringRedisSerializer); personInfoRedisTemplate.setValueSerializer(jdkSerializationRedisSerializer); personInfoRedisTemplate.setHashKeySerializer(stringRedisSerializer); personInfoRedisTemplate.setHashValueSerializer(jdkSerializationRedisSerializer); return personInfoRedisTemplate; } }
2:基本使用Ddemo
[code] //向redis中写数据(五种基本类型操作,和redis命令行操作基本一致) @RequestMapping("addPersonInfo") public Map<String,Object> addPersonInfo(){ String key = "personInfo"; PersonInfo personInfo = new PersonInfo(); personInfo.setUserId("1"); personInfo.setAge(18); personInfo.setName("小花"); personInfo.setSex("女"); //操作string personInfoRedisTemplate.opsForValue().set(key+"_str",personInfo); //操作list personInfoRedisTemplate.opsForList().leftPush(key+"list",personInfo); //操作set personInfoRedisTemplate.opsForSet().add(key+"_set",personInfo); //操作有序set personInfoRedisTemplate.opsForZSet().add(key+"_ZSet",personInfo,100); //操作hash散列 personInfoRedisTemplate.opsForHash().put(key+"_Map",personInfo.getUserId(),personInfo); Map<String,Object> map = new HashMap<>(3); map.put("success",true); map.put("data","1"); return map; } @RequestMapping("getPersonInfo") public Map<String,Object> getPersonInfo(){ String key = "personInfo"; //操作string PersonInfo personInfo1 = personInfoRedisTemplate.opsForValue().get(key+"_str"); //操作list PersonInfo personInfo2 = personInfoRedisTemplate.opsForList().leftPop(key+"list"); //操作set PersonInfo personInfo3 =personInfoRedisTemplate.opsForSet().pop(key+"_set"); //操作有序set Set<PersonInfo> personInfos = personInfoRedisTemplate.opsForZSet().range(key+"_ZSet",0,100); //操作hash散列 PersonInfo personInfo5 = (PersonInfo) personInfoRedisTemplate.opsForHash().get(key+"_Map","1"); Map<String,Object> map = new HashMap<>(3); map.put("str",personInfo1); map.put("list",personInfo2); map.put("set",personInfo3); map.put("zSet",personInfos); map.put("hash",personInfo5); return map; }
3:RedisTemplate对Pipelined支持
使用redis中pipelined可以优化批量处理需求的性能,不过pipelined不具有原子性,当执行到某一条命令时失败时会丢弃此条命令
测试Demo:
[code]/** * RedisTemplate 对于pipeline支持 * 提供SessionCallback与RedisCallback两种 作用一样 * @return */ @RequestMapping("pipelineTest") public Map<String,Object> pipelineTest(){ //测试数据 final List<PersonInfo> personInfosTest = new ArrayList<>(); for (int i=0;i<50;i++){ PersonInfo personInfo = new PersonInfo(); personInfo.setUserId(""+i); personInfo.setAge(i); personInfo.setName("小花"+i); personInfo.setSex("女"); personInfosTest.add(personInfo); } final String key = "personInfo_pipeline_"; //SessionCallback 属于高级 代码书写非常友好 personInfoRedisTemplate.executePipelined(new SessionCallback<Object>() { @Override public Object execute(RedisOperations operations) throws DataAccessException { for (int i=0;i<personInfosTest.size();i++){ PersonInfo personInfo = personInfosTest.get(i); int j = i%5; //使用不同命令 if(j==0){ operations.opsForValue().set(key+"_str",personInfo); }else if(j==1){ operations.opsForHash().put(key+"_Map",personInfo.getUserId(),personInfo); }else if(j==2){ operations.opsForList().leftPush(key+"list",personInfo); }else if(j==3){ operations.opsForZSet().add(key+"_ZSet",personInfo,100); }else { operations.opsForSet().add(key+"_set",personInfo); } } return null; } },personInfoRedisTemplate.getValueSerializer()); //RedisCallback偏底层些 处理byt[]类型 List<Object> ts = personInfoRedisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { for (int i=0;i<personInfosTest.size();i++){ PersonInfo personInfo = personInfosTest.get(i); int j = i%5; RedisSerializer stringSerializer = personInfoRedisTemplate.getKeySerializer(); RedisSerializer valueSerializer = personInfoRedisTemplate.getValueSerializer(); //使用不同命令 if(j==0){ connection.set(stringSerializer.serialize(key+"str"),valueSerializer.serialize(personInfo)); }else if(j==1){ connection.hSet(stringSerializer.serialize(key+"hash"),stringSerializer.serialize(personInfo.getUserId()),valueSerializer.serialize(personInfo)); }else if(j==2){ connection.lPush(stringSerializer.serialize(key+"list"),valueSerializer.serialize(personInfo)); }else if(j==3){ connection.zAdd(stringSerializer.serialize(key+"zSet"),100,valueSerializer.serialize(personInfo)); }else { connection.sAdd(stringSerializer.serialize(key+"set"),valueSerializer.serialize(personInfo)); } } //返回结果必须为null return null; } },personInfoRedisTemplate.getValueSerializer()); Map<String,Object> map = new HashMap<>(3); map.put("success",true); map.put("data",ts); return map; }
查看结果可以看到操作不同命令返回结果是不一样:
[code]"data": [ true, true, 1, true, 1, true, true, 2, true, 1, 。。。。。 ],
4:RedisTemplate对事务支持
redis中的事务本质是一组命令的集合。在事务执行的过程会保证其它客户端提交的命令插入到当前序列中(排它性),这组命令会顺序执行,当个命令会是原子性的,但整组命令并不是原子性的,某个命令的失败(如果是错误是命令性错误类似于java编译性错误,则整个命令都不会执行)不会影响其它命令的执行也不会对之前执行的命令进行回滚。
redis提供了5组命令(
MULTI,
EXEC,
DISCARD,
WATCH和
UNWATCH)对于事务支持
对于RedisTemplate中使用execute(SessionCallback)方法支持
demo代码:
[code]@RequestMapping("transactionalTest") public Map<String,Object> transactionalTest(){ //这里模拟用户取钱的场景 初始银行是10000 用户是10 每次用户取10 //这里通过压测工具模拟100个并发 5000个请求是否能保证数据的一致性 final String bank_key = "transactional_bank"; //默认为10000 final String person_key = "transactional_person";//默认为100 List<String> watchs = new ArrayList<>(); watchs.add(bank_key); watchs.add(person_key); //执行事务 personInfoRedisTemplate.execute(new SessionCallback() { @Override public Object execute(RedisOperations operations) throws DataAccessException { List rs; do { //监控需要变化key operations.watch(watchs); //对应multi命令 operations.multi(); operations.opsForValue().decrement(bank_key,10); //需要执行命令 operations.opsForValue().increment(person_key,10); //提交 rs = operations.exec(); System.out.println(rs); //和CAS中自旋概念类似 }while (rs!=null && rs.size()==0); return null; } }); Map<String,Object> map = new HashMap<>(3); map.put("success",true); map.put("data",1); return map; }
这里使用了简单的压测工具ab来进行测试: ab -n 5000 -c 100 'http://host.docker.internal:18670/transactionalTest'
最终结果为:
在100个并发情况下共5000个请求平均耗时为:
5:RedisTemplate对Lua语言支持
上述说到了redis中的Pipelined和事务都支持多组命令一起执行但是使用Pipelined不保证整体命令的原子性,事务在某些条件下也不包含原子性。在redis2.8之后引入了新的方式可以解决此类问题——Lua,它在执行的时候是不会被中断的,整体具有原子性。因为它的这种特性,一般将它用于高并发的场景下。(对Lua语言不了解的可以现百度了解下这门轻型的脚本语言)
5.1:使用内置字符串形式
[code] /** * redis对于Lua支持 * @return */ @RequestMapping("LuaTest") public Map<String,Object> LuaTest(){ //创建脚本 DefaultRedisScript<String> defaultRedisScript = new DefaultRedisScript(); //redis.call redis执行命令 KEYS代表redis中key ARGV代表参数 return指需要返回数据 String luaText = "redis.call('set',KEYS[1],ARGV[1]) return redis.call('get',KEYS[1])"; //设置脚本内容 defaultRedisScript.setScriptText(luaText); //也可以通过将lue写入文件方式进行调用易于维护 // defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/demo.lua"))); //设置脚本数据返回类型 defaultRedisScript.setResultType(String.class); //执行命令 //第一个代表的需要执行的脚本 //第二对应着lua中的KEYS信息 KEYS下标从1开始 //第三个是对应ARGV中指 ARGV下标从1开始 String rs = stringRedisTemplate.execute(defaultRedisScript,Collections.singletonList("lua_test"),"测试lua数据"); Map<String,Object> map = new HashMap<>(3); map.put("success",true); map.put("data",rs); return map; }
返回结果:
[code]{ "data": "测试lua数据", "success": true }
5.2:以文件的形式
如果需要执行的逻辑较复杂,此时我们可以通过将lua写入文件的方式进行处理例如(去银行存钱)。
需求:某银行现余额10000元,现存在一批人(100)向银行存储共8000次,每次一元,使用lua脚本是否可以保证数据一致性
代码:
[code]/** * 测试 * @return */ @RequestMapping("LuaFileTest") public Map<String,Object> LuaFileTest(){ //创建脚本 DefaultRedisScript<String> defaultRedisScript = new DefaultRedisScript(); //也可以通过将lue写入文件方式进行调用易于维护 defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/demo.lua"))); //设置脚本数据返回类型 defaultRedisScript.setResultType(String.class); //redis 需要操作key List<String> keys = new ArrayList<>(); keys.add("lua_bank"); keys.add("lua_person"); // Object[] args = new Object[]{"10000","10000","1"} ; //执行命令 //第一个代表的需要执行的脚本 //第二对应着lua中的KEYS信息 KEYS下标从1开始 //第三个是对应ARGV中指 ARGV下标从1开始 String rs = stringRedisTemplate.execute(defaultRedisScript,keys,args); System.out.println("-------结果集-----"+rs); Map<String,Object> map = new HashMap<>(3); map.put("success",true); map.put("data",rs); return map; }
一样使用ab压测进行模拟测试: ab -n 8000 -c 100 'http://host.docker.internal:18670/LuaFileTest'
最终结果与预期一致,可以保证数据的一致性:
在100个并发情况下共8000个请求平均耗时为:
整体的吞吐量高于事务
5.3:使用Lua的一些应用场景
场景一:使用redis构建分布式锁
场景二:针对于电商中秒杀场景下保证数据一致性
假设某电商中某店家为某一个产品做秒杀,每一个用户只能在规定时间内抢夺总共50个商品,且每人限购一份,如何在高并发下保证商品的数量不会出现溢出的现象,这里使用redis结合lue来实现。
java代码:
[code]/** * 高并发下秒杀商品代码实现 * @param userId 这里是模拟代码 所以userId 以参数的形式 * @return */ @RequestMapping("seckillMerchandise") public Map<String,Object> seckillMerchandise(@RequestParam("userId") String userId){ //创建脚本 DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript(); //也可以通过将lue写入文件方式进行调用易于维护 defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/seckillMerchandise.lua"))); //设置脚本数据返回类型 defaultRedisScript.setResultType(Long.class); //redis 需要操作key List<String> keys = new ArrayList<>(); //记录商品总数 这里测试数据20个 keys.add("seckill_merchandise_count"+"_spid"); //记录抢购到的用户记录 keys.add("seckill_merchandise_user"+"_spid"); //用户id Object[] args = new Object[]{userId,System.currentTimeMillis()+""} ; //执行命令 //第一个代表的需要执行的脚本 //第二对应着lua中的KEYS信息 KEYS下标从1开始 //第三个是对应ARGV中指 ARGV下标从1开始 Long rs = stringRedisTemplate.execute(defaultRedisScript,keys,args); //根据不同结果进行不同处理 --- System.out.println("-------结果集-----:"+rs); Map<String,Object> map = new HashMap<>(3); map.put("success",true); map.put("data",rs); return map; }
lua代码:
[code]--秒杀商品lua代码 --返回状态: -- 0 当前商品已售空 -- -1 该用户已经抢购了该商品了 -- -2 代表存储秒杀商品总数的key不存在 非法数据无法处理或者抢购时间已结束 -- 1代表正常返回 --用户key local userId = ARGV[1] --存储秒杀商品总数的key 该值在设定抢购开始时写入缓存 并设置过期时间 local seckillMerchandiseCountKey = tostring(KEYS[1]) --目前商品剩余总数 local count = redis.call('GET',seckillMerchandiseCountKey) if count == false then --代表存储秒杀商品总数的key不存在 非法数据无法处理或者抢购时间已结束 return -2 end --转换为数值类型 count = tonumber(count) if count <= 0 then --代表当前商品已售空 return 0 end --记录已抢够成功用户列表 用于判断用户是否抢购 --因为秒杀活动不是针对于一种商品 所以这里以商品id+一些唯一标示组合为key value为hash散列 local seckillMerchandiseUserKey = tostring(KEYS[2]) local flag = redis.call('HGET',seckillMerchandiseUserKey,userId) if flag == true then --该用户已经抢购了该商品了 return -1 end --该用户抢购了该商品了 需要对数据进行变更 --商品 -1 redis.call('DECRBY',seckillMerchandiseCountKey,1) --将用户写入 并记录抢购成功时间 redis.call('HSET',seckillMerchandiseUserKey,userId,ARGV[2]) return 1
模拟测试代码:
[code]#!/bin/bash ###使用ab构建测试脚本 因为博主用的是mac电脑 使用docker 运行ab的所以这里主机名用/、host.docker.internal for (( i = 0; i < 100; i++ )); do ab -n 3 -c 1 'http://host.docker.internal:18670/seckillMerchandise?userId=userId_'+i done
最终结果(在100个并发下共300个请求下并不存在超卖的情况):
其实上述只是描述下lua的使用的一些业务场景,真正的秒杀业务设计没这么简单。后续会针对于秒杀系统讲述自己的一些想法
6:RedisTemplate对于redis中MQ功能支持
redis提供轻量级的mq功能
6.1:简单demo代码
[code]/** * 用于初始化依赖类 * @author fangyuan */ @Configuration public class RedisListenerConfiguration { /** * MessageListenerAdapter此类需要被注入到spring中执行 * 因为MessageListenerAdapter实现了InitializingBean接口 执行afterPropertiesSet() * @return */ @Bean public MessageListenerAdapter messageListenerAdapter(){ //将代理类注入到MessageListenerAdapter 代理执行 return new MessageListenerAdapter(new Recevetor(),"receve"); } /** * 创建监听容器 用于管理监听器 * @param redisConnectionFactory * @return */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer( StringRedisSerializer stringRedisSerializer, MessageListenerAdapter messageListenerAdapter, ObjectProvider<RedisConnectionFactory> redisConnectionFactory){ RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); //设置基础配置 redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory.getObject()); redisMessageListenerContainer.setBeanName("redisMessageListenerContainer"); redisMessageListenerContainer.setTopicSerializer(stringRedisSerializer); //创建多监听者 监听不同topic //使用自定义监听方式 MessageListener messageListener = new ConsumerMessageListener(); redisMessageListenerContainer.addMessageListener(messageListener,new ChannelTopic("redis_mq_test_1")); //使用适配器方式(通过代理执行设计模式+反射来做到) messageListenerAdapter.setStringSerializer(stringRedisSerializer); messageListenerAdapter.setSerializer(stringRedisSerializer); redisMessageListenerContainer.addMessageListener(messageListenerAdapter,new ChannelTopic("redis_mq_test_2")); return redisMessageListenerContainer; } }
定义两种接受方式:
[code]/** * 监听消息 * @author fangyuan */ public class ConsumerMessageListener implements MessageListener { private StringRedisSerializer stringRedisSerializer; public ConsumerMessageListener(){ stringRedisSerializer = new StringRedisSerializer(); } @Override public void onMessage(Message message, byte[] pattern) { String data = stringRedisSerializer.deserialize(message.getBody()); String channel = stringRedisSerializer.deserialize(message.getChannel()); System.out.println("从"+channel+"中---收到消息----:"+data); } }
[code] /** * 定义被MessageListenerAdapter被代理执行类 * @author fangyuan */ public class Recevetor { /** * 接受message 此方法的参数只能为两个参数message,topic) 或者是一个参数(message) * @param message */ public void receve(String message,String channel){ System.out.println("通过MessageListenerAdapter代理方式从"+channel+"中---收到消息----:"+message); } }
测试消息类:
[code]/** * 发送信息到mq * @return */ @RequestMapping("produceMessage") public Map<String,Object> produceMessage(){ String channel = "redis_mq_test_1"; int count = atomicInteger.incrementAndGet(); if(count%2==0){ channel = "redis_mq_test_2"; } String message = "测试使用redis作为MQ发送数据__"+count; //发布信息 stringRedisTemplate.convertAndSend(channel,message); Map<String,Object> map = new HashMap<>(3); map.put("success",true); map.put("data",1); return map; }
最终结果:
6.2:走读MessageListenerAdapter源码
此类通过代理设计模式+反射调用来实现需求
[code]//此类需要被注入到spring中 因为实现了该接口InitializingBean 需要初始化MethodInvoker对象 public void afterPropertiesSet() { String methodName = getDefaultListenerMethod(); if (!StringUtils.hasText(methodName)) { throw new InvalidDataAccessApiUsageException("No default listener method specified: " + "Either specify a non-null value for the 'defaultListenerMethod' property or " + "override the 'getListenerMethodName' method."); } invoker = new MethodInvoker(delegate, methodName); } //实现MessageListener接口 public void onMessage(Message message, @Nullable byte[] pattern) { try { //判断代理类本质是否是MessageListener类如果是 强制转换下调用即可 if (delegate != this) { if (delegate instanceof MessageListener) { ((MessageListener) delegate).onMessage(message, pattern); return; } } //获取message 和channel信息 Object convertedMessage = extractMessage(message); String convertedChannel = stringSerializer.deserialize(pattern); // 组装成Invoke调用方法参数形式 Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel }; //通过反射 调用代理类指定方法 invokeListenerMethod(invoker.getMethodName(), listenerArguments); } catch (Throwable th) { handleListenerException(th); } }
6.3:redis-MQ应用场景
redis-MQ作为一个轻量型mq,它的应用场景是和redis本质有很大关系,因为redis的核心技术在于内存的使用,那么代表着redis-MQ的实时性很高;但也带来着对应的缺点,例如无法存储大量信息(内存资源宝贵);信息的可用性难保证(第一种情况:消息被发送,如果没有订阅者接收,那么消息就会丢失;第二种情况:redis挂掉此条消息存在丢失数据可能);不支持断点消费;每个消费者会消费所有数据(无法支持组消费或者说不支持负载均衡消费,会存在单点压力过大)。所以针对需求技术选型的时候需要考虑上述问题。
Demo代码:https://github.com/fangyuan94/redisDemo
- 点赞 1
- 收藏
- 分享
- 文章举报
- springboot2.x +redis使用和源码分析三(序列化器)
- 分析“备忘使用spring-data-redis中的redistemplate的一个大坑”
- Redis详解 - SpringBoot整合Redis,RedisTemplate和注解两种方式的使用
- Springboot 2使用外部Tomcat源码分析
- Spring boot2.X 简单尝试 RedisTemplate 实现分布式锁
- spring boot中使用resid-RedisTemplate
- springboot使用protostuff进行序列化和反序列化整合redis的redisTemplate的各种方法的写法
- springboot源码分析4-springboot之SpringFactoriesLoader使用
- Spring-boot通过redisTemplate使用redis(无须手动序列化)
- springboot源码分析16-spring boot监听器使用
- SpringBoot2.X 整合RedisTemplate 简单实现消息队列
- Spring boot下配置使用redis--template编码形式
- springboot源码分析10-ApplicationContextInitializer使用
- SpringBoot整合redis集群并使用StringRedisTemplate和RedisTemplate简单操作Redis集群
- Spring-boot通过redisTemplate使用redis(无须手动序列化)
- Spring boot 使用多个RedisTemplate
- SpringBoot2.x使用Redis实现缓存入门
- 1 Springboot中使用redis,自动缓存、更新、删除
- spring boot 源码分析(一) 案例
- spring boot-使用redis的Keyspace Notifications实现定时任务队列