您的位置:首页 > 编程语言 > Java开发

spring boot amqp实现延迟队列功能

2018-05-22 09:24 716 查看
@Configuration
public class MQConfig {
/**
* 发送到该队列的message会在一段时间后过期进入到delay_process_queue
* 队列里所有的message都有统一的失效时间
*/
public final static String DELAY_QUEUE_PER_QUEUE_TTL_NAME = "delay_queue_per_queue_ttl";
final static int QUEUE_EXPIRATION = 10000;

/**
* message失效后进入的队列,也就是实际的消费队列
*/
final static String DELAY_PROCESS_QUEUE_NAME = "delay_process_queue";

/**
* DLX
*/
final static String DELAY_EXCHANGE_NAME = "delay_exchange";

/**
* 路由到delay_queue_per_queue_ttl的exchange
*/
public final static String PER_QUEUE_TTL_EXCHANGE_NAME = "per_queue_ttl_exchange";

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.10.151");
connectionFactory.setUsername("myc");
connectionFactory.setPassword("myc");
//        connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}

/**
* 创建DLX exchangedelayQueuePerMessageTTL
*
* @return
*/
@Bean
DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}

/**
* 创建per_queue_ttl_exchange
*
* @return
*/
@Bean
DirectExchange perQueueTTLExchange() {
return new DirectExchange(PER_QUEUE_TTL_EXCHANGE_NAME);
}

/**
* 创建delay_queue_per_queue_ttl队列
*
* @return
*/
@Bean
Queue delayQueuePerQueueTTL() {
return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
.withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
.withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间
.build();
}

/**
* 创建delay_process_queue队列,也就是实际消费队列
*
* @return
*/
@Bean
Queue delayProcessQueue() {
return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
.build();
}

/**
* 将DLX绑定到实际消费队列
*
* @param delayProcessQueue
* @param delayExchange
* @return
*/
@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayProcessQueue)
.to(delayExchange)
.with(DELAY_PROCESS_QUEUE_NAME);
}

/**
* 将per_queue_ttl_exchange绑定到delay_queue_per_queue_ttl队列
*
* @param delayQueuePerQueueTTL
* @param perQueueTTLExchange
* @return
*/
@Bean
Binding queueTTLBinding(Queue delayQueuePerQueueTTL, DirectExchange perQueueTTLExchange) {
return BindingBuilder.bind(delayQueuePerQueueTTL)
.to(perQueueTTLExchange)
.with(DELAY_QUEUE_PER_QUEUE_TTL_NAME);
}

消息的生产者:

amqpTemplate.convertAndSend(MQConfig.PER_QUEUE_TTL_EXCHANGE_NAME, MQConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME, "aa");
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spring Boot Spring AMQP