您的位置:首页 > 编程语言 > Java开发

springboot+RabbitMQ做延迟消息详解(一)死信延迟,已运用到公司项目中

2020-02-04 05:20 921 查看

此方法已用到公司商城项目,可用。

业务:订单10分钟未支付,则自动取消订单;

思路:采用死信队列做延迟(该业务的所有消息的延时时间都是一样的才能用)

死信概念:消费达到设置的时间未消费这变为死消息,进入配置的死信队列进行消费。

原理:将消息投放到某队列中(表面队列),改队列无消费者,消息时间到了无法消费,变为死消息,进入死信队列,进行正真的消费,来做到消息的延时。

注意点:这里死信做延迟,是把消息放到死信队列里做延迟的,也就是说放在队列里做延时的,这样消息是先进先出的,有阻塞。因为订单未支付定时取消,10分钟是死的,所以消息先进先出是没有问题的。

如果你是做商品按指定时间上架/店铺的拼团指定时间上下架活动/预售指定上下架活动/优惠卷指定时间上下架活动的,这里的死信做延迟是不满足业务的。
打个比方,如果今天我先指定商品A明天凌晨12点上架售卖,再指定商品B今晚凌晨12点上架售卖,这里今晚12点到了,B消息也无法消费,因为还没到明天凌晨12点,消息A还没消费,阻塞了消息B消费。要等A消息消费完,B才能消费。这个我们下一篇讲解如何解决,如果我还没更新,着急解决的可以+我qq 1872065148 , 我可以先帮你解决这个问题。

pmx配置

<!-- MQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

boot项目配置mq

spring:
rabbitmq:
host: ***
port: 5672
username: ***
password: ***
#设置手动ack回执
listener:
simple:
acknowledge-mode: manual
#none 不确认,auto 自动确认 manual 手动确认

配置rabbitmq的队列、交换机及其转发路由

@Configuration
public class RabbitMqOrderConfig {
//订单消息实际消费交换机,队列,路由
private final static String EXCHANGE_NAME = "mall_exchange";
private final static String QUEUE_NAME = "mall_queue";
private final static String ROUTE_KEY = "mall_routekey";
// 订单消息延迟消费队列所绑定的交换机
private final static String DELAY_EXCHANGE_NAME = "mall_delay_exchange";
private final static String DELAY_QUEUE_NAME = "mall_delay_queue";
private final static String DEALY_ROUTE_KEY = "mall_delay_routekey";

/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
DirectExchange assembleDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(EXCHANGE_NAME)
.durable(true)
.build();
}

/**
* 订单实际消费队列
*/
@Bean
public Queue assembleQueue() {
return new Queue(QUEUE_NAME);
}

/**
* 将订单队列绑定到交换机
*/
@Bean
Binding assembleBinding(DirectExchange assembleDirect, Queue assembleQueue) {
return BindingBuilder
.bind(assembleQueue)
.to(assembleDirect)
.with(ROUTE_KEY);
}

/**
* 订单延迟队列队列所绑定的交换机
*/
@Bean
DirectExchange assembleTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(DELAY_EXCHANGE_NAME)
.durable(true)
.build();
}

/**
* 订单延迟队列(死信队列)
*/
@Bean
public Queue assembleTtlQueue() {
return QueueBuilder
.durable(DELAY_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", EXCHANGE_NAME)//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", ROUTE_KEY)//到期后转发的路由键
.build();
}

/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding assembleTtlBinding(DirectExchange assembleTtlDirect, Queue assembleTtlQueue) {
return BindingBuilder
.bind(assembleTtlQueue)
.to(assembleTtlDirect)
.with(DEALY_ROUTE_KEY);
}
}

生产者:

@Component
public class RabbitOrderSender  {
// 订单消息延迟消费队列所绑定的队列,交换机以及路由
private final static String DELAY_EXCHANGE_NAME = "mall_delay_exchange";
private final static String DELAY_QUEUE_NAME = "mall_delay_queue";
private final static String DEALY_ROUTE_KEY = "mall_delay_routekey";

@Autowired
private AmqpTemplate amqpTemplate;

public void sendDelay(Long orderId, Long expirationTime) {
this.amqpTemplate.convertAndSend(DELAY_EXCHANGE_NAME ,DEALY_ROUTE_KEY , orderId, message -> {
// 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
//10分钟未支付直接取消订单
message.getMessageProperties().setExpiration(String.valueOf(60*1000*10));
return message;
});
}
}

消费者:

@Component
//监听的队列名(实际消费的队列),该队列的消息都走该消费者
@RabbitListener(queues = "mall_queue")
public class RabbitOrderReceiver {

@Autowired
private OrderService orderSerivce;

@RabbitHandler
public void orderDelayQueue(Long orderId, Message message, Channel channel) {
//ack回执
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
orderSerivce.payChecked(orderId);
}
}

取消订单逻辑:

OrderSerivceIml中
//这里写接口payChecked接口自改订单状态为取消的具体逻辑,省略

业务投放消息:

orderImpl中注入生产者
@Autowired
private RabbitOrderSender  rabbitOrderSender;
//这里省略相关业务逻辑代码...
//投放消息到延迟的交换机中,并设置10分钟后未支付则取消订单
rabbitOrderSender.sendDelay(orderId,60*1000*10L);
  • 点赞
  • 收藏
  • 分享
  • 文章举报
颜姐是只猫 发布了13 篇原创文章 · 获赞 1 · 访问量 1451 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: