springboot中redis的发布与订阅实现不同的逻辑处理
需求说明:
项目中要记录一下用户的积分和经验值,积分和经验值的来源有:登录、看文章、看课程。
现在我要把记录的功能抽取出来统一写代码,于是我就想到了用消息队列,但是项目中只引用了redis没有mq,于是就用光了redis的发布与订阅。
下面来看代码:
RedisReceiveType为常量类,用于记录处理的消息类型以及注册的处理器名称。
/** * @author: WeiMaoMao * @date: */ public class RedisReceiveType { /** * 文章 */ public static final String POPULARIZATION_LAW = "popularizationLawMessageAdapter"; /** * 课程 */ public static final String COURSE = "courseMessageAdapter"; /** * 登录 */ public static final String LOGIN = "loginMessageAdapter"; }
EmpiricalScoreReceive类为所有消息处理器的父级接口,RECEIVE_METHOD_NAME为处理消息的方法名,用于redis中adapter的反射。
/** * @author Administrator */ public interface EmpiricalScoreReceive { String RECEIVE_METHOD_NAME="receiveMessage"; /** * 消息处理方法 * @param message */ void receiveMessage(Object message); }
CourseMessageReceiveImpl、LoginMessageReceiveImpl、PopularizationLawMessageReceiveImpl为EmpiricalScoreReceive 的实现类,代码根据自己的业务逻辑处理,这里避免重复我只贴出一个实现类:
/** * 登录消息处理器 * @author Administrator */ @Service public class LoginMessageReceiveImpl implements EmpiricalScoreReceive { private final RedisUtils redisUtils; public LoginMessageReceiveImpl(RedisUtils redisUtils){ this.redisUtils = redisUtils; } @Override public void receiveMessage(Object user){ //根据redis的list中栈的特性,进行判断是否消息已经消费 Object userIdObject = redisUtils.lRightPop(RedisReceiveType.LOGIN); System.out.println("登录,消息被接收!"); //判断消息是否已经被消费 if (userIdObject == null) { System.out.println("登录,消息已经被消费喽!"); return; } System.out.println("登录,消息正在被消费!"); //这里需要注意一下序列化问题 FastJsonRedisSerializer seria = new FastJsonRedisSerializer(User.class); user = (User)seria.deserialize(user.getBytes()); System.out.println(user.getId()); } }
最重要的一个类RedisChannelConfig,此类为redis的配置类。
@Configuration public class RedisChannelConfig { /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, Map<String,MessageListenerAdapter> listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅主题 listenerAdapter.forEach((clazzName,clazz)->{ //这个container 可以添加多个 messageListener container.addMessageListener(clazz, new PatternTopic(clazzName)); }); MessageListenerAdapter messageListenerAdapter = listenerAdapter.get(RedisReceiveType.LOGIN); return container; } /** * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 法治实践活动适配器 * @param receiver * @return */ @Bean(RedisReceiveType.POPULARIZATION_LAW) MessageListenerAdapter popularizationLawListenerAdapter(PopularizationLawMessageReceiveImpl receiver) { //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage” //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看 return new MessageListenerAdapter(receiver, EmpiricalScoreReceive.RECEIVE_METHOD_NAME); } /** * 登录适配器 * @param receiver * @return */ @Bean(RedisReceiveType.LOGIN) MessageListenerAdapter loginMessageReceive(LoginMessageReceiveImpl receiver) { return new MessageListenerAdapter(receiver, EmpiricalScoreReceive.RECEIVE_METHOD_NAME); } /** * 课程适配器 * @param receiver * @return */ @Bean(name= RedisReceiveType.COURSE) MessageListenerAdapter courseListenerAdapter(CourseMessageReceiveImpl receiver) { return new MessageListenerAdapter(receiver, EmpiricalScoreReceive.RECEIVE_METHOD_NAME); } }
注意:此处引用的时候MessageListenerAdapter的时候用了一个Map,这样spring就会给我依赖注入所有的以注册时BeanName为key、MessageListenerAdapter对象为val的map。还有就是我的@Bean中定义的名字为固定的,便于我在使用的时候直接调用
使用方法:
User user=new User(); user.setId("123"); //添加list值,防止消息多次消费 redisUtils.listRightPush(RedisReceiveType.LOGIN,"1"); redisTemplate.convertAndSend(RedisReceiveType.LOGIN, user);
特别注意redis的序列化问题:
1、多端部署消息多次被消费问题。
我的解决方案是通过redis的list进行控制,发送时插入消费时取出。
2、序列化问题
redis的配置我使用的是FastJsonRedisSerializer序列化方式,所以上面在反序列化时用的也是FastJsonRedisSerializer。如果不使用反序列化传入的数据为String类型而且多加了双引号。
@Bean(name = "redisTemplate") @ConditionalOnMissingBean(name = "redisTemplate") public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<>(); //序列化 FastJsonRedisSerializer<Object> fastJsonRedisSerializer = new FastJsonRedisSerializer<>(Object.class); // value值的序列化采用fastJsonRedisSerializer template.setValueSerializer(fastJsonRedisSerializer); template.setHashValueSerializer(fastJsonRedisSerializer); // 全局开启AutoType,这里方便开发,使用全局的方式 ParserConfig.getGlobalInstance().setAutoTypeSupport(true); // 建议使用这种方式,小范围指定白名单 // ParserConfig.getGlobalInstance().addAccept("com.starcity.domain"); // key的序列化采用StringRedisSerializer template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.setConnectionFactory(redisConnectionFactory); return template; }
如果文章对您有帮助麻烦点个赞吧~
- java+redis+spring mvc实现发布订阅(不同项目间)
- spring-boot集成redis实现消息发布订阅模式,以及多个订阅模式的实现
- redis 消息队列发布订阅模式spring boot实现
- 【转】redis 消息队列发布订阅模式spring boot实现
- Spring boot+redis实现消息发布与订阅的代码
- Spring boot 使用Redis实现消息的订阅与发布
- Spring boot+redis实现消息发布与订阅
- SpringBoot2.0集成Redis的sub/pub(订阅/发布)功能实现获取Redis实时数据
- 使用Spring-Redis实现消息的发布/订阅
- spring-redis实现消息生产者发布和消费者订阅
- Springboot + rabbitMQ实现延迟消费以及spring与策略模式联合处理不同的业务(一)
- Springboot + rabbitMQ实现延迟消费以及spring与策略模式联合处理不同的业务(二)
- 【IM产品开发系列之Redis发布订阅】SpringBoot2.0中使用redis的发布订阅模式
- Spring Boot使用Redis进行消息的发布订阅
- 基于spring-redis发布订阅模式的实现
- 浅谈SpringBoot集成Redis实现缓存处理(Spring AOP实现)
- Redis系列-JAVA与redis整合-Spring Data Redis实现一个订阅/发布系统
- linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能 标签: hiredishiredis异步APIhiredis事件处理redis消息订阅发布redis c接口 2016-
- Spring Boot使用Redis进行消息的发布订阅
- Spring Boot使用Redis进行消息的发布订阅