您的位置:首页 > 数据库 > Redis

springboot中redis的发布与订阅实现不同的逻辑处理

2020-06-07 04:30 99 查看

需求说明:

项目中要记录一下用户的积分和经验值,积分和经验值的来源有:登录、看文章、看课程。
现在我要把记录的功能抽取出来统一写代码,于是我就想到了用消息队列,但是项目中只引用了redis没有mq,于是就用光了redis的发布与订阅。

下面来看代码:
RedisReceiveType为常量类,用于记录处理的消息类型以及注册的处理器名称。

/**
* @author: WeiMaoMao
* @date:
*/
public class RedisReceiveType {
/**
* 文章
*/
public static final String POPULARIZATION_LAW = "popularizationLawMessageAdapter";
/**
* 课程
*/
public static final String COURSE = "courseMessageAdapter";
/**
* 登录
*/
public static final String LOGIN = "loginMessageAdapter";
}

EmpiricalScoreReceive类为所有消息处理器的父级接口,RECEIVE_METHOD_NAME为处理消息的方法名,用于redis中adapter的反射。

/**
* @author Administrator
*/
public interface EmpiricalScoreReceive {

String RECEIVE_METHOD_NAME="receiveMessage";
/**
* 消息处理方法
* @param message
*/
void receiveMessage(Object message);
}

CourseMessageReceiveImpl、LoginMessageReceiveImpl、PopularizationLawMessageReceiveImpl为EmpiricalScoreReceive 的实现类,代码根据自己的业务逻辑处理,这里避免重复我只贴出一个实现类:

/**
* 登录消息处理器
* @author Administrator
*/
@Service
public class LoginMessageReceiveImpl implements EmpiricalScoreReceive {
private final RedisUtils redisUtils;
public LoginMessageReceiveImpl(RedisUtils redisUtils){
this.redisUtils = redisUtils;
}
@Override
public void receiveMessage(Object user){
//根据redis的list中栈的特性,进行判断是否消息已经消费
Object userIdObject = redisUtils.lRightPop(RedisReceiveType.LOGIN);
System.out.println("登录,消息被接收!");
//判断消息是否已经被消费
if (userIdObject == null) {
System.out.println("登录,消息已经被消费喽!");
return;
}
System.out.println("登录,消息正在被消费!");
//这里需要注意一下序列化问题
FastJsonRedisSerializer seria = new FastJsonRedisSerializer(User.class);
user = (User)seria.deserialize(user.getBytes());
System.out.println(user.getId());
}
}

最重要的一个类RedisChannelConfig,此类为redis的配置类。

@Configuration
public class RedisChannelConfig {

/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
Map<String,MessageListenerAdapter> listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅主题
listenerAdapter.forEach((clazzName,clazz)->{
//这个container 可以添加多个 messageListener
container.addMessageListener(clazz, new PatternTopic(clazzName));
});
MessageListenerAdapter messageListenerAdapter = listenerAdapter.get(RedisReceiveType.LOGIN);
return container;
}
/**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 法治实践活动适配器
* @param receiver
* @return
*/
@Bean(RedisReceiveType.POPULARIZATION_LAW)
MessageListenerAdapter popularizationLawListenerAdapter(PopularizationLawMessageReceiveImpl receiver) {
//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
//也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看

return new MessageListenerAdapter(receiver, EmpiricalScoreReceive.RECEIVE_METHOD_NAME);
}
/**
* 登录适配器
* @param receiver
* @return
*/
@Bean(RedisReceiveType.LOGIN)
MessageListenerAdapter loginMessageReceive(LoginMessageReceiveImpl receiver) {
return new MessageListenerAdapter(receiver, EmpiricalScoreReceive.RECEIVE_METHOD_NAME);
}
/**
* 课程适配器
* @param receiver
* @return
*/
@Bean(name= RedisReceiveType.COURSE)
MessageListenerAdapter courseListenerAdapter(CourseMessageReceiveImpl receiver) {
return new MessageListenerAdapter(receiver, EmpiricalScoreReceive.RECEIVE_METHOD_NAME);
}
}

注意:此处引用的时候MessageListenerAdapter的时候用了一个Map,这样spring就会给我依赖注入所有的以注册时BeanName为key、MessageListenerAdapter对象为val的map。还有就是我的@Bean中定义的名字为固定的,便于我在使用的时候直接调用

使用方法:

User user=new User();
user.setId("123");
//添加list值,防止消息多次消费
redisUtils.listRightPush(RedisReceiveType.LOGIN,"1");
redisTemplate.convertAndSend(RedisReceiveType.LOGIN, user);

特别注意redis的序列化问题:
1、多端部署消息多次被消费问题。
我的解决方案是通过redis的list进行控制,发送时插入消费时取出。
2、序列化问题
redis的配置我使用的是FastJsonRedisSerializer序列化方式,所以上面在反序列化时用的也是FastJsonRedisSerializer。如果不使用反序列化传入的数据为String类型而且多加了双引号。

@Bean(name = "redisTemplate")
@ConditionalOnMissingBean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
//序列化
FastJsonRedisSerializer<Object> fastJsonRedisSerializer = new FastJsonRedisSerializer<>(Object.class);
// value值的序列化采用fastJsonRedisSerializer
template.setValueSerializer(fastJsonRedisSerializer);
template.setHashValueSerializer(fastJsonRedisSerializer);
// 全局开启AutoType,这里方便开发,使用全局的方式
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
// 建议使用这种方式,小范围指定白名单
// ParserConfig.getGlobalInstance().addAccept("com.starcity.domain");
// key的序列化采用StringRedisSerializer
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setConnectionFactory(redisConnectionFactory);
return template;
}

如果文章对您有帮助麻烦点个赞吧~

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: