spring boot amqp实现延迟队列功能
2018-05-22 09:24
716 查看
@Configuration public class MQConfig { /** * 发送到该队列的message会在一段时间后过期进入到delay_process_queue * 队列里所有的message都有统一的失效时间 */ public final static String DELAY_QUEUE_PER_QUEUE_TTL_NAME = "delay_queue_per_queue_ttl"; final static int QUEUE_EXPIRATION = 10000; /** * message失效后进入的队列,也就是实际的消费队列 */ final static String DELAY_PROCESS_QUEUE_NAME = "delay_process_queue"; /** * DLX */ final static String DELAY_EXCHANGE_NAME = "delay_exchange"; /** * 路由到delay_queue_per_queue_ttl的exchange */ public final static String PER_QUEUE_TTL_EXCHANGE_NAME = "per_queue_ttl_exchange"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.10.151"); connectionFactory.setUsername("myc"); connectionFactory.setPassword("myc"); // connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate amqpTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } /** * 创建DLX exchangedelayQueuePerMessageTTL * * @return */ @Bean DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); } /** * 创建per_queue_ttl_exchange * * @return */ @Bean DirectExchange perQueueTTLExchange() { return new DirectExchange(PER_QUEUE_TTL_EXCHANGE_NAME); } /** * 创建delay_queue_per_queue_ttl队列 * * @return */ @Bean Queue delayQueuePerQueueTTL() { return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间 .build(); } /** * 创建delay_process_queue队列,也就是实际消费队列 * * @return */ @Bean Queue delayProcessQueue() { return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME) .build(); } /** * 将DLX绑定到实际消费队列 * * @param delayProcessQueue * @param delayExchange * @return */ @Bean Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) { return BindingBuilder.bind(delayProcessQueue) .to(delayExchange) .with(DELAY_PROCESS_QUEUE_NAME); } /** * 将per_queue_ttl_exchange绑定到delay_queue_per_queue_ttl队列 * * @param delayQueuePerQueueTTL * @param perQueueTTLExchange * @return */ @Bean Binding queueTTLBinding(Queue delayQueuePerQueueTTL, DirectExchange perQueueTTLExchange) { return BindingBuilder.bind(delayQueuePerQueueTTL) .to(perQueueTTLExchange) .with(DELAY_QUEUE_PER_QUEUE_TTL_NAME); }
消息的生产者:
amqpTemplate.convertAndSend(MQConfig.PER_QUEUE_TTL_EXCHANGE_NAME, MQConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME, "aa");
相关文章推荐
- Spring Boot与RabbitMQ结合实现延迟队列的示例
- 详细介绍Spring Boot + RabbitMQ实现延迟队列
- SpringBoot中发送QQ邮件功能的实现代码
- SpringBoot JPA实现增删改查、分页、排序、事务操作等功能
- C#实现rabbitmq 延迟队列功能
- 用rebbitMq来实现你的延迟队列功能
- SpringBoot 集成 rabbitmq 简单实现通过队列进行,订单系统与库存系统,物流系统之间的信息交互
- Spring Boot RabbitMQ 延迟消息实现完整版示例
- Spring boot 集成 Kaptcha 实现前后端分离验证码功能
- Spring Boot与RabbitMQ延迟队列使用示例
- SpringBoot非官方教程 | 第十四篇:在springboot中用redis实现消息队列
- SpringBoot+Shiro学习之“记住我”和“GIF验证码”功能的实现
- springboot实现定时器功能
- 企业级 SpringBoot 教程 (十四) 在springboot中用redis实现消息队列
- 【转】redis 消息队列发布订阅模式spring boot实现
- 在Spring Boot框架下使用WebSocket实现聊天功能
- C#实现rabbitmq 延迟队列功能
- spring boot Rabbitmq集成,延时消息队列实现
- SpringBoot+Thymeleaf实现html文件引入(类似include功能)
- Java Spring Boot 服务器端断点续传功能支持 实现代码