您的位置:首页 > 编程语言 > Java开发

springboot+rabbitmq两小时入门(八):死信交换机

2019-08-09 12:12 363 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_32880973/article/details/98949206

概要:

对于消费失败、消息过期、队列超载的消息,我们可以通过给队列(queue)配置x-dead-letter-exchange和x-dead-letter-routing-key属性,将另一个交换机关联为这个队列(queue)的死信交换机(Dead letter exchange),并把消息转发过去。

该队列所有Nack、Reject、消息过期、队列超载的消息都会自动投递到死信交换机。

 

application.properties配置:

[code]​spring.rabbitmq.host=localhost

# TCP/IP端口为5672,http端口为15672
spring.rabbitmq.port=5672

spring.rabbitmq.username=root

spring.rabbitmq.password=root

# 开启发送确认
spring.rabbitmq.publisher-confirms=true

# 开启发送失败退回
spring.rabbitmq.publisher-returns=true

# 消费者ack有3种模式:NONE、AUTO、MANUAL
# NONE: 不管消费是否成功mq都会把消息剔除,这是默认配置方式。
# MANUAL:手动应答
# AUTO:自动应答,除非MessageListener抛出异常。
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

​

生产者:(沿用上一篇的生成者)

[code]package com.example.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RabbitMQController {

// 这里用的是RabbitTemplate发消息,也可以用AmqpTemplate,推荐使用RabbitTemplate。
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping(value = "/helloRabbit5")
public String sendMQ5(){
String msg = "rabbitmq生成者发送失败和消费失败处理方案";
try {
// 针对网络原因导致连接断开,利用retryTemplate重连10次
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(10));
rabbitTemplate.setRetryTemplate(retryTemplate);

// 确认是否发到交换机,若没有则存缓存,用另外的线程重发,直接在里面用rabbitTemplate重发会抛出循环依赖错误
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
// 存缓存操作
System.out.println(msg + "发送失败:" + cause);
}
});

// 确认是否发到队列,若没有则存缓存,然后检查exchange, routingKey配置,之后重发
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 存缓存操作
System.out.println(new String(message.getBody()) + "找不到队列,exchange为" + exchange + ",routingKey为" + routingKey);
});

rabbitTemplate.convertAndSend("myExchange1", "routingKey4", msg);
} catch (AmqpException e) {
// 存缓存操作
System.out.println(msg + "发送失败:原因重连10次都没连上。");
}

return "success";
}
}

​

消费者:(在上一篇的基础上改造)

[code]package com.example.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

/**
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
*      deliveryTag: 每条消息在mq内部的id,
*      multiple: 是否批量(true:将一次性拒绝所有小于deliveryTag的消息);
*      requeue: 是否重新入队
* 给该队列配置死信交换机,Nack、Reject、消息过期、队列超载都会自动投递到死信交换机
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
value = "myQueue6",
arguments = {
@Argument(name = "x-dead-letter-exchange", value = "myExchange1_dlx"),
@Argument(name = "x-dead-letter-routing-key", value = "routingKey4_dlx")}),
exchange = @Exchange(value = "myExchange1"),
key = "routingKey4"
))
public void process7(Message message, Channel channel) throws Exception {
System.out.println("myQueue6:" +  new String(message.getBody()));
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}

/**
* 死信交换机
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "myQueue9"),
exchange = @Exchange(value = "myExchange1_dlx"),
key = "routingKey4_dlx"
))
public void process9(Message message){
System.out.println("myQueue9:" +  new String(message.getBody()));
}
}

启动项目,访问http://localhost:8080/helloRabbit5,控制台打印

myQueue6:rabbitmq生成者发送失败和消费失败处理方案
myQueue9:rabbitmq生成者发送失败和消费失败处理方案

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: