精通RabbitMQ之死信队列与延迟任务调度
2018-12-23 12:54
1096 查看
精通RabbitMQ之死信队列与延迟任务调度
死信队列
在某些情况下,例如当一个消息无法被成功路由时,消息或许会被返回给生产者并被丢弃;或者如果我们为消息设置了有效期,延期后消息会被放入一个所谓的死信队列中。此时,消息生产者可以选择配置死信队列参数来处理这些特殊情况。
一般来讲呢,死信队列都是一些过期的或者不需要处理的消息,我们这边其实就是故意利用了消息过期之后进入死信队列这个特性来处理延迟任务,为消息设置需要延迟的时间的等长有效期,等消息过期之后从死信队列里面拿出消息处理。
一般来说以下情况会导致消息进入死信队列:
basic.reject/basic.nack
并且设置requeue
为false
(不重回队列)的时候,消息就会进入死信队列 。- 消息队列
TTL
过期或者消息有效期过期。 - 队列达到最大的长度,并且我们没有设置自动拒绝消息的时候,队首的消息就会进入死信队列 。
死信队列实现延迟任务
这里我用springboot给大家演示一个利用消息过期时间来完成延时任务的案例。
application.yml
Spring: application: name: rabbitmq-example rabbitmq: host: 192.168.199.133 port: 5672 username: test password: test123 virtual-host: / vhost: / firstdelaytimeout: 300000 seconddelaytimeout: 600000
AMQPConfig.java
这里,我们先申明等待延迟的队列以及对应的交换机 和 对应死信交换机和死信交换机绑定队列。
@Slf4j @Configuration public class AMQPConfig { @Autowired private RabbitMQProperties rabbitMQProperties; @Bean public ConnectionFactory connectionFactory(){ //第一步配置连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(rabbitMQProperties.getUsername()); factory.setPassword(rabbitMQProperties.getPassword()); factory.setVirtualHost(rabbitMQProperties.getVhost()); factory.setHost(rabbitMQProperties.getHost()); factory.setPort(rabbitMQProperties.getPort()); return factory; } @Bean(destroyMethod = "close") public RabbitMQChannel rabbitMQChannel(ConnectionFactory connectionFactory){ return new RabbitMQChannel(connectionFactory); } //申明延迟交换机 @Bean public AMQP.Exchange.DeclareOk declareFirstDelayExchange(RabbitMQChannel rabbitMQChannel){ log.info(" rabbitMQChannel {} channel {} ",rabbitMQChannel,rabbitMQChannel.getChannel()); try { return rabbitMQChannel.getChannel().exchangeDeclare("first-delay-exchange", BuiltinExchangeType.DIRECT, true, false,false,null); } catch (IOException e) { e.printStackTrace(); } return null; } //申明死信队列交换机 @Bean public AMQP.Exchange.DeclareOk declareFirstDeadExchange(RabbitMQChannel rabbitMQChannel){ try { return rabbitMQChannel.getChannel().exchangeDeclare("first-delay-dead-exchange", BuiltinExchangeType.DIRECT, true, false,false,null); } catch (IOException e) { e.printStackTrace(); } return null; } //申明等待过期队列 @Bean public AMQP.Queue.DeclareOk declareFirstDelayQueen(RabbitMQChannel rabbitMQChannel 1d25b ){ Map<String, Object> map = new HashMap<>(); //标志队列中的消息存活时间,也就是说队列中的消息超过了指定时间会被删除(数字类型,标志时间,以豪秒为单位) map.put("x-message-ttl", rabbitMQProperties.getFirstdelaytimeout()); /* 消息因为超时或超过限制在队列里消失,这样我们就丢失了一些消息,也许里面就有一些是我们做需要获知的。而rabbitmq的死信功能则为我们带来了解决方案。 设置了dead letter exchange与dead letter routingkey(要么都设定,要么都不设定)那些因为超时或超出限制而被删除的消息会被推动到我们设置的exchange中, 再根据routingkey推到queue中. */ map.put("x-dead-letter-exchange","first-delay-dead-exchange"); map.put("x-dead-letter-routing-key","first-delay-dead"); try { return rabbitMQChannel.getChannel().queueDeclare("first-delay-queen", true, false, false, map); } catch (IOException e) { e.printStackTrace(); } return null; } //申明过期投递到死信交换机绑定的队列 @Bean public AMQP.Queue.DeclareOk declareFirstDelayDeadQueen(RabbitMQChannel rabbitMQChannel){ try { return rabbitMQChannel.getChannel().queueDeclare("first-delay-dead-queen", true, false, false, null); } catch (IOException e) { e.printStackTrace(); } return null; } //绑定延迟队列和延迟交换机 @Bean public AMQP.Queue.BindOk bindFirstDelayQueen(RabbitMQChannel rabbitMQChannel){ try { return rabbitMQChannel.getChannel().queueBind("first-delay-queen", "first-delay-exchange", "first-delay", null); } catch (IOException e) { e.printStackTrace(); } return null; } //绑定死信队列和死信交换机 @Bean public AMQP.Queue.BindOk bindFirstDelayDeadQueen(RabbitMQChannel rabbitMQChannel){ try { return rabbitMQChannel.getChannel().queueBind("first-delay-dead-queen", "first-delay-dead-exchange", "first-delay-dead", null); } catch (IOException e) { e.printStackTrace(); } return null; } }
消息发送到设置过期事件的队列
@Slf4j @Component public class QueenRunner implements ApplicationRunner { @Autowired private RabbitMQChannel rabbitMQChannel; SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); @Override public void run(ApplicationArguments args) throws Exception { Channel channel = rabbitMQChannel.getChannel(); try { channel.confirmSelect(); } catch (IOException e) { e.printStackTrace(); } channel.addConfirmListener(new ConfirmListener() { //处理消息成功回执 @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { log.info("Ack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } //处理失败回执 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { log.info("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } }); // 查看下一个要发送的消息的序号 long nextSeqNo = channel.getNextPublishSeqNo(); try { channel.basicPublish("first-delay-exchange", "first-delay", MessageProperties.PERSISTENT_TEXT_PLAIN, "test runeer info".getBytes("utf-8")); } catch (IOException e) { e.printStackTrace(); } log.info("send first message nextSeqNo :{}",nextSeqNo); confirmSet.add(nextSeqNo); } }
消费死信队列中的延迟之后的消息
@Slf4j @Component public class FirstDelayConsumer extends DefaultConsumer { @Autowired private RabbitMQChannel rabbitMQChannel; public FirstDelayConsumer(RabbitMQChannel rabbitMQChannel) { super(rabbitMQChannel.getChannel()); this.rabbitMQChannel = rabbitMQChannel; try { rabbitMQChannel.getChannel().basicConsume("first-delay-dead-queen",false, "first-delay-dead-consumer-tag",this); } catch (IOException e) { e.printStackTrace(); } } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge all deliveries up to // this delivery tag rabbitMQChannel.getChannel().basicAck(deliveryTag, true); log.info("consumerTag : {} deliveryTag : {} message {} ",consumerTag,deliveryTag,new String(body,"utf-8")); } }阅读更多
相关文章推荐
- celery配合rabbitmq任务队列实现任务的异步调度执行
- celery配合rabbitmq任务队列实现任务的异步调度执行[celery redis] 推荐
- Rabbitmq延迟队列实现定时任务的方法
- 源码解析线程池调度器之任务延迟调度实现机制
- RabbitMQ系列-延迟队列
- RabbitMQ消息队列(三):任务分发机制
- 使用rabbitmq工作队列实现任务的负载分发
- RabbitMQ第二篇--工作队列(任务队列)
- 关于利用RabbitMQ实现延迟任务的方法详解
- RabbitMQ如何实现延迟队列?
- RabbitMQ 死信/死信队列
- RabbitMQ如何实现延迟队列?
- 【转】分布式异步任务队列 Celery + rabbitmq (or redis )
- rabbitmq 实现延迟队列的两种方式
- RabbitMQ(六)使用Dead Letter(死信队列)进行延时发送
- RabbitMQ消息队列(三):任务分发机制
- RabbitMQ随手笔记(十四)死信队列、延迟队列(.netCore2.0)
- RabbitMQ消息队列(三):任务分发机制(转)
- rabbitmq AMQP 死信队列报错COMMAND_INVALID,PRECONDITION_FAILED
- C#实现rabbitmq 延迟队列功能