您的位置:首页 > 数据库 > Redis

spring-boot集成redis实现消息发布订阅模式,以及多个订阅模式的实现

2019-03-28 16:17 2601 查看

spring-boot集成redis实现消息发布订阅模式,以及多个订阅模式的实现

  • 4.设置消息发布者、消息处理者POJO、redis消息监听容器以及redis监听器注入IOC容器
  • 5.启动项目查看控制台
  • 参照学习的博客作者和地址
  • 6.如何配置多个监听通道
  • demo地址
  • 1.需要的依赖

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    2.配置redis以及连接池

    # redis
    spring:
    redis:
    host: 127.0.0.1
    port: 6379
    #    password:
    database: 1
    timeout: 5000
    jedis:
    pool:
    max-active: 8
    max-wait: 1
    max-idle: 500
    min-idle: 0

    3.创建消息的发布者和消息处理者类

    消息发布者

    @EnableScheduling//开启定时器功能
    @Component
    public class MessageSender {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    /**
    * 间隔2秒,通过stringRedisTemplate对象向redis消息队列chat频道发布消息
    */
    @Scheduled(fixedDelay = 2000)
    public void sendMessage() {
    stringRedisTemplate.convertAndSend("chat", String.valueOf(Math.random()));
    }
    }

    消息处理器POJO

    @Component
    public class MessageReceiver {
    
    /**
    * 接收消息方法
    */
    public void receiverMessage(String message) {
    System.out.println("MessageReceiver收到一条新消息:" + message);
    }
    }

    4.设置消息发布者、消息处理者POJO、redis消息监听容器以及redis监听器注入IOC容器

    /**
    * redis配置
    *
    * @author 段誉
    * @create 2019-03-25 9:59
    */
    @Configuration//相当于xml中的beans
    public class RedisConfig {
    
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
    MessageListenerAdapter listenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    //订阅了一个叫chat的通道
    container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
    return container;
    }
    
    /**
    * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
    * @param receiver
    * @return
    */
    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
    //给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
    //不填defaultListenerMethod默认调用handleMessage
    return new MyMessageListenerAdapter1(receiver, "receiverMessage");
    }
    
    /**
    * 读取内容的template
    */
    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
    return new StringRedisTemplate(connectionFactory);
    }
    }

    5.启动项目查看控制台

    MessageReceiver收到一条新消息:0.7372683821918483
    MessageReceiver收到一条新消息:0.800095651812774

    配置成功

    参照学习的博客作者和地址

    以上内容参考作者博客编写
    作者:猿X人
    来源:CSDN
    原文:https://www.geek-share.com/detail/2730629348.html

    6.如何配置多个监听通道

    方式1:一个监听器订阅多个通道

    1.监听容器配置

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
    MessageListenerAdapter listenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    //订阅了一个叫chat的通道
    container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
    
    container.addMessageListener(listenerAdapter, new PatternTopic("chat1"));
    return container;
    }

    2.添加一个新的消息发布者

    @EnableScheduling
    @Component
    public class MessageSender2 {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    /**
    * 间隔2秒,通过stringRedisTemplate对象向redis消息队列chat频道发布消息
    */
    @Scheduled(fixedDelay = 2000)
    public void sendMessage() {
    stringRedisTemplate.convertAndSend("chat1", "来自chat1的消息" + Math.random());
    }
    }

    3.启动项目查看控制台打印

    MessageReceiver收到一条新消息:0.9988032526665156
    MessageReceiver收到一条新消息:来自chat1的消息0.5760191019007642
    MessageReceiver收到一条新消息:0.37241454741448377
    MessageReceiver收到一条新消息:来自chat1的消息0.639498468451238

    说明配置成功

    方式2:配置多个监听器监听不同的通道

    1.创建一个类继承

    MessageListenerAdapter
    类,添加一个构造函数调用
    MessageListenerAdapter
    public MessageListenerAdapter(Object delegate, String defaultListenerMethod)
    构造方法

    public class MyMessageListenerAdapter1 extends MessageListenerAdapter {
    public MyMessageListenerAdapter1(Object delegate, String defaultListenerMethod) {
    super(delegate, defaultListenerMethod);
    }
    }

    2.创建一个新的消息消息处理POJO

    @Component
    public class MyMessageReceiver1 {
    public void receiverMessage(String message) {
    System.out.println("MyMessageReceiver1接收到消息:" + message);
    }
    }

    3.在redisConfig中再添加一个

    MyMessageListenerAdapter1
    的注入方法

    @Bean
    MyMessageListenerAdapter1 listenerAdapter1(MyMessageReceiver1 receiver) {
    return new MyMessageListenerAdapter1(receiver, "receiverMessage");
    }

    4.配置监听容器

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
    MessageListenerAdapter listenerAdapter,
    MyMessageListenerAdapter1 listenerAdapter1) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    //订阅了一个叫chat的通道
    container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
    container.addMessageListener(listenerAdapter1, new PatternTopic("chat1"));
    return container;
    }

    5.添加一个新的消息发布者

    @EnableScheduling
    @Component
    public class MessageSender2 {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    /**
    * 间隔2秒,通过stringRedisTemplate对象向redis消息队列chat频道发布消息
    */
    @Scheduled(fixedDelay = 2000)
    public void sendMessage() {
    stringRedisTemplate.convertAndSend("chat1", "来自chat1的消息" + Math.random());
    }
    }

    5.启动项目打印日志如下

    MessageReceiver收到一条新消息:0.3005424045447558
    MyMessageReceiver1接收到消息:来自chat1的消息0.03669388620502867
    MessageReceiver收到一条新消息:0.3228170694698418
    MyMessageReceiver1接收到消息:来自chat1的消息0.16289256229958515

    配置成功

    demo地址

    https://gitee.com/fengzxia/springboot-redis-queue

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: