您的位置:首页 > 其它

RabbitMQ实现消息确认机制

2019-01-24 15:26 288 查看

这里只介绍confirm模式,事务模式就不演示了

rabbitmq消息确认 分为 生产者确认 和 消费者确认 两者不耦合

  1. 发布端确认

 

  • 引入Jar包

这里使用的是gradle配置

compile('org.springframework.boot:spring-boot-starter-amqp')

配置参数

spring.rabbitmq.addresses localhost:5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=gues

#消息发送到交换机确认机制,是否确认回调
spring.rabbitmq.publisher-returns=true
#消息发送到交换机确认机制,是否返回回调
spring.rabbitmq.publisher-confirms=true

 配置一个队列

[code]
@Bean
Queue  myQueue(){
return new Queue("myQueue");
}
@Bean
DirectExchange myDirectExchange(){
return new DirectExchange("myDirectExchange");
}
@Bean
Binding myBinding(){
return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("my.routing.key");
}

配置发送类 注意这里需要实现两个回调类并设置一下template ,这里可以由config类去实现

[code]/**
* author:zw
*/
@Component
public class RabbitMQSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

private static final Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);
@Autowired
RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}

public void sendDirectMsg(Object msg){
CorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());
System.out.println("消息id" + correlationData.getId());
rabbitTemplate.convertAndSend("myDirectExchange","my.routing.key",msg,correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("发布消息确认-消息id:" + correlationData.getId());
if (ack) {
System.out.println("消息发送确认成功");
} else {
System.out.println("消息发送确认失败:" + cause);
}
}

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode
+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
}

2.消费端确认

 

#监听异常重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
#是否开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
#重试间隔
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms

#是否开启拒绝
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#确认机制 自动 手动
spring.rabbitmq.listener.simple.acknowledge-mode=manual

 

[code]/**
* author:zw
*/
@Component
public class RabbitMQConsumer {

@RabbitListener(queues = "myQueue")
public void process(Message msg, Channel channel) throws IOException {
System.out.println("进入myQueue");
if (null != msg && null != msg.getBody() && 0 != msg.getBody().length)
//这里介绍三个参数的含义
//msg.getMessageProperties().getDeliveryTag()  当前消息的tagId
//multiple true 确认之前收到的消息,false 只确认当前消息
//requeue true 重新回到队列 false 不回到队列
HashMap<String, String> map = (HashMap<String, String>) SerializationUtils.deserialize(msg.getBody());
if("20".equals(map.get("age"))){
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);//消息确认
System.out.println("消息确认");
}else if("25".equals(map.get("age"))){
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true);// 消息取消确认
System.out.println("消息不确认");
}else{
channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);//消息拒绝 返回队列
System.out.println("消息拒绝");
}
}
}

运行一下查看一下结果

第一次传入age=20的消息 保证发布端和消费端都确认成功

由截图可见 发送端已经确认消息发到交换机并且成功路由到匹配队列

消费端进入消费并手动确认消费

 

第二次 修改发送端发送的交换机名称为Mq服务器里不存在的交换机名然后查看结果

修改信息如下

结果如下

第三次 修改发送端路由为没有匹配队列的路由名

修改如下

结果如下

由图可见消息被确认发送到了交换机但因没有找到匹配路由键的队列所以消息被退回

第四次 观察消费端不确认消息并开启重返队列效果

修改配置如下

传入age=25 结果如下

由图可见如果一直不做处理的情况下,队列被不确认后会一直重新回到队列被消费

 

第五次 测试拒绝

修改代码如下

增加一个拒绝条件判断 当一个消息被重复消费超过10次消息拒绝

以上就是RabbitMQ的confirm确认机制实现代码

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: