您的位置:首页 > 数据库 > Redis

12.springboot使用redis的订阅功能实现消息队列

2017-12-14 00:58 851 查看

1.redis发布/订阅功能

redis提供了发布与队列功能(redis实战p52),简单来说就是某一个客户端A向redis订阅一个通道,其他客户端B向通道发送数据时,客户端A能被动地接到通知到并获取别人发布的数据。
客户端命令如下:
SUBSCRIBE  channel1 channel2..   #订阅一个或多个通道
UNSUBSCRIBE channel1 channel2.. #退订

PUBLISH channel mes       #向通道发布mes

PSUBSCRIBE pattern1 pattern2..  #订阅与给定模式相匹配的所有channel
PUNSUBSCRIBE pattern1 pattern2..  #退订
springboot-redis依赖已经向这些命令封装了,只需要定义一个接受者bean,再将接受者bean和消息处理方法封装成适配器传递给redis监听器容器并指定channel通道名称就可以实现监听。当其他客户端向channel里面publish了数据之后,监听器容器里面包含的所有接受者bean都会执行他们消息处理方法

2.建立工程

(1)pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>com.tyf</groupId>
<artifactId>redis-test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>redis-test</name>
<url>http://maven.apache.org</url>

<!-- springboot -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/>
</parent>

<!-- 编码 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>1.4.7.RELEASE</version>
</dependency> <!-- redis -->

</dependencies>

4000

<!-- maven插件 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

</project>


(2)application.properties
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=
spring.redis.database=1
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1
spring.redis.pool.max-idle=500
spring.redis.pool.min-idle=0
spring.redis.timeout=0


(3)接受者
package com.tyf.redis;

import org.springframework.stereotype.Component;

//注入消息接受者,接收到消息自动调用receiveMessage方法
@Component
public class MessageReceiver {

//接收消息的方法
public void receiveMessage(String message){
//这里处理其他逻辑
System.out.println(message);
}

}


(4)redis配置类
package com.tyf.redis;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisConfig {

@Bean //消息监听容器(监听redis服务器中channel通道的信息写入)
RedisMessageListenerContainer container(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);

//监听redis中一个channel通道
container.addMessageListener(listenerAdapter, new PatternTopic("channel"));
//这个container 可以添加多个 messageListener
return container;

}

@Bean //消息监听适配器,把消息接受者/消息接受者的消息处理方法封装到适配器中(这个消息监听适配器是消息监听容器所需要的)
MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
//监听到消息手反射调用receiver的receiveMessage方法
return new MessageListenerAdapter(receiver, "receiveMessage");
}

@Bean //注入操作数据的template(这里不需要操作redis数据,和消息队列功能无关)
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}

先将消息接收者封装成监听器适配器,再将适配器传给消息监听容器。容器服务监听通道中的数据变化,当监听到其他客户端的发布时就将容器中的接受者的消息处理方法执行

(5)app

package com.tyf.redis;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;

@EnableCaching	//开启默认的springcache缓存
@SpringBootApplication
public class App
{
public static void main( String[] args ) throws Exception{
SpringApplication.run(App.class, args);
}

}

3.发布消息查看控制台输出

上面工程只是做了接受者,具体的消息发布者可以重新建立一个工程定期向Channel中publish数据就可以了。这里在cmd下进入redis客户端发布一条消息:



下面的0指的是当前订阅者客户端的数量,我这里还没有开工程所以是0
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: