您的位置:首页 > 其它

RabbitMQ消息确认(发送确认,接收确认)

2017-12-12 23:15 330 查看


消息确认

每个 Consumer 可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer 出错或异常退出,而数据还没有处理完成,那么这段数据就丢失了。因为我们采用 no-ack 的方式进行确认,也就是说,每次 Consumer 接到数据后,不管是否处理完成,RabbitMQ Server
会立即把这个 Message 标记为完成,然后从 Queue 中删除。

为了保证数据不被丢失,RabbitMQ 支持消息确认机制,这种机制下不能采用 no-ack,而应该是在处理完数据后发送 ack。如果处理中途 Consumer 退出了,但是没有发送 ack,那么 RabbitMQ 就会把这个 Message 发送到下一个 Consumer,这样就保证了在 Consumer 异常退出的情况下数据也不会丢失。

这里并没有用到超时机制,RabbitMQ 仅仅通过 Consumer 的连接中断来确认该 Message 并没有被正确处理,也就是说,RabbitMQ 给 Consumer 足够长的时间来做数据处理。

之前的例子中,这就是消息确认机制的应用,这种情况下,即使中断任务执行,也不会影响 RabbitMQ 中消息的处理,RabbitMQ 会将其发送给下一个 Consumer 进行处理。

如果忘记了 ack,那么后果很严重。当 Consumer 退出时,Message 会重新分发。然后 RabbitMQ 会占用越来越多的内存,由于 RabbitMQ 会长时间运行,因此这个“内存泄漏”是致命的。针对这行场景,可以通过以下命令进行 debug:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged


消息持久化

为了保证在 RabbitMQ 退出或者 crash 了数据不丢失,需要将 Queue 和 Message 持久化。

使用spring的代码示例

下面是一个使用spring整合的代码示例:

首先是rabbitmq的配置文件:

[html] view
plain copy

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
<!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都没有,要用跟jar包匹配的版本 -->

<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

<rabbit:connection-factory

id="connectionFactory"

host="${rabbit.host}"

port="${rabbit.port}"

username="${rabbit.username}"

password="${rabbit.password}"

publisher-confirms="true"

/>

<rabbit:admin connection-factory="connectionFactory" />

<!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才生效 -->

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"

confirm-callback="confirmCallBackListener"

return-callback="returnCallBackListener"

mandatory="true"

/>

<rabbit:queue name="CONFIRM_TEST" />

<rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" >

<rabbit:bindings>

<rabbit:binding queue="CONFIRM_TEST" />

</rabbit:bindings>

</rabbit:direct-exchange>

<!-- 配置consumer, 监听的类和queue的对应关系 -->

<rabbit:listener-container

connection-factory="connectionFactory" acknowledge="manual" >

<rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" />

</rabbit:listener-container>

</beans>

然后发送方:

[java] view
plain copy

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service("publishService")

public class PublishService {

@Autowired

private AmqpTemplate amqpTemplate;

public void send(String exchange, String routingKey, Object message) {

amqpTemplate.convertAndSend(exchange, routingKey, message);

}

}

消费方:

[java] view
plain copy

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

@Service("receiveConfirmTestListener")

public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {

@Override

public void onMessage(Message message, Channel channel) throws Exception {

try{

System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}catch(Exception e){

e.printStackTrace();//TODO 业务处理

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);

}

}

}

确认后回调:

[java] view
plain copy

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;

import org.springframework.amqp.rabbit.support.CorrelationData;

import org.springframework.stereotype.Service;

@Service("confirmCallBackListener")

public class ConfirmCallBackListener implements ConfirmCallback{

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);

}

}

失败后return回调:

[java] view
plain copy

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;

import org.springframework.stereotype.Service;

@Service("returnCallBackListener")

public class ReturnCallBackListener implements ReturnCallback{

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);

}

}

测试类:

[java] view
plain copy

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.dingcheng.confirms.publish.PublishService;

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = {"classpath:application-context.xml"})

public class TestConfirm {

@Autowired

private PublishService publishService;

private static String exChange = "DIRECT_EX";

@Test

public void test1() throws InterruptedException{

String message = "currentTime:"+System.currentTimeMillis();

System.out.println("test1---message:"+message);

//exchange,queue 都正确,confirm被回调, ack=true

publishService.send(exChange,"CONFIRM_TEST",message);

Thread.sleep(1000);

}

@Test

public void test2() throws InterruptedException{

String message = "currentTime:"+System.currentTimeMillis();

System.out.println("test2---message:"+message);

//exchange 错误,queue 正确,confirm被回调, ack=false

publishService.send(exChange+"NO","CONFIRM_TEST",message);

Thread.sleep(1000);

}

@Test

public void test3() throws InterruptedException{

String message = "currentTime:"+System.currentTimeMillis();

System.out.println("test3---message:"+message);

//exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE

publishService.send(exChange,"",message);

// Thread.sleep(1000);

}

@Test

public void test4() throws InterruptedException{

String message = "currentTime:"+System.currentTimeMillis();

System.out.println("test4---message:"+message);

//exchange 错误,queue 错误,confirm被回调, ack=false

publishService.send(exChange+"NO","CONFIRM_TEST",message);

Thread.sleep(1000);

}

}

测试结果:

[html] view
plain copy

test1---message:currentTime:1483786948506

test2---message:currentTime:1483786948532

consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506

test3---message:currentTime:1483786948536

confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)

confirm--:correlationData:null,ack:false,cause:Channel closed by application

[ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)

return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey:

confirm--:correlationData:null,ack:true,cause:null

test4---message:currentTime:1483786948546

confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)

[ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)

代码和配置里面,已经都有注释,就不在多说明了.(callback是异步的,所以测试中sleep1秒钟等待下)

总结下就是:

如果消息没有到exchange,则confirm回调,ack=false

如果消息到达exchange,则confirm回调,ack=true

exchange到queue成功,则不回调return

exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

备注:需要说明,spring-rabbit和原生的rabbit-client ,表现是不一样的.

测试的时候,原生的client,exchange错误的话,直接就报错了,是不会到confirmListener和returnListener的

源码地址:https://github.com/qq315737546/spring-rabbit
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: