您的位置:首页 > 数据库 > Redis

SpringBoot用redis实现消息队列(生产/消费+发布/订阅)

2020-02-02 13:08 2176 查看

引入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

配置参数

spring.redis.database=0
# Redis服务器地址
spring.redis.host=192.168.128.132
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=2000

创建一个生产者类

/**
* 生产者类
*/
@Component
public class Producer {

@Autowired
private StringRedisTemplate stringRedisTemplate;

private ListOperations<String,String> listRedis;

@PostConstruct
private void init(){
listRedis = stringRedisTemplate.opsForList();
}

public void publishMessage(String message){
listRedis.leftPush("message",message);
}
}

创建一个消费者

/**
* 消费者类
*/
@Component
public class Consumer {

@Autowired
private StringRedisTemplate stringRedisTemplate;

private ListOperations<String,String> listRedis;

@PostConstruct
public void init(){
listRedis = stringRedisTemplate.opsForList();
}

public void receive(){
while (true){
//取出消息
String message = listRedis.rightPop("message");
if (message==null || message.isEmpty()){
System.out.println("消息队列为空");
break;
}
System.out.println("message:"+message);
}
}

}

创建controller

@RestController
public class Controller {

@Autowired
private SendService sendService;

@RequestMapping("/send/{message}")
public void sendMessage(@PathVariable("message")String message){
sendService.sendMessage(message);
}

}

只需要这三个类,生产者消费者模式就创建好了,可以测试了,启动程序
输入网址

http://localhost:8080/produce/send/hello
http://localhost:8080/produce/send/wrold
http://localhost:8080/produce/send/receive

可看到控制台打印出来了

接下来开始写发布订阅
创建发布者

/**
* 消息发布者
*/
@Service
public class SendService {

@Autowired
private StringRedisTemplate stringRedisTemplate;

public void sendMessage(String message){
try {
stringRedisTemplate.convertAndSend("myChannel",message);
}catch (Exception e){
e.printStackTrace();
}

}

}

订阅者

/**
* 消息订阅者
*/
@Service
public class Receiver {

public void  receiveMessage(String message){
System.out.println("Receive:"+message);
}
}

创建监听器类

/**
* 配置监听器
*/
@Configuration
@AutoConfigureAfter(Receiver.class)
public class SubscriberConfig {
/**
* 注入消息监听适配器
*/
@Bean
public MessageListenerAdapter getMessageListenerAdapter(Receiver receiver){
return new MessageListenerAdapter(receiver, "receiveMessage");
}

/**
* 注入消息监听容器
*/
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer
(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter){
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(messageListenerAdapter,new PatternTopic("myChannel"));
return redisMessageListenerContainer;
}
}

然后再创建controller

@RestController
public class Controller {

@Autowired
private SendService sendService;

@RequestMapping("/send/{message}")
public void sendMessage(@PathVariable("message")String message){
sendService.sendMessage(message);
}

}

再启动程序测试。

http://localhost:8080/send/hi
http://localhost:8080/send/hi
http://localhost:8080/send/hi
http://localhost:8080/send/hi%20hello

可以看到每次访问一次控制台就直接访问了 ,无需主动提取

恕我直言,之前做了那么多关于springboot集成的东西,好像和spring没啥区别

  • 点赞
  • 收藏
  • 分享
  • 文章举报
刮瓜蛙 发布了17 篇原创文章 · 获赞 0 · 访问量 302 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: