(转)RabbitMQ学习之消息可靠性及特性
2017-07-06 09:42
204 查看
http://blog.csdn.net/zhu_tianwei/article/details/53971296
下面主要从队列、消息发送、消息接收方面了解消息传递过的一些可靠性处理。
1、队列
消费者是无法订阅或者获取不存在的MessageQueue中信息。消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
声明一个队列
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
durable:声明队列持久化
exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
其他选项,channel.queueDeclarePassive:例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
2、发送消息
1.发送消息设置
channel.basicPublish(exchange, routingKey, mandatory, immediate, basicProperties, body);
basicProperties:通过参数实现消息持久化,MessageProperties.PERSISTENT_TEXT_PLAIN
mandatory:当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
immediate:当immediate标志位设置为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
2.事务机制
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
3.Confirm机制
事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。可以mandatory配合实现消息的发送可靠性。
3、消息接收
1.autoAck
为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者。默认是开启的,在消费者端通过下面的方式开启消息确认, 首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认.
2.公平调度
让每个消费者在同一时刻会分配一个任务。 通过channel.basicQos(prefetchCount);可以设置。
3.exclusive
和queue一样,设置了true,只有第一个启动的消费者可用。
channel.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer)
下面主要从队列、消息发送、消息接收方面了解消息传递过的一些可靠性处理。
1、队列
消费者是无法订阅或者获取不存在的MessageQueue中信息。消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
声明一个队列
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
durable:声明队列持久化
exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
其他选项,channel.queueDeclarePassive:例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
2、发送消息
1.发送消息设置
channel.basicPublish(exchange, routingKey, mandatory, immediate, basicProperties, body);
basicProperties:通过参数实现消息持久化,MessageProperties.PERSISTENT_TEXT_PLAIN
public BasicProperties( String contentType,//消息类型如:text/plain String contentEncoding,//编码 Map<String,Object> headers, Integer deliveryMode,//1:nonpersistent 2:persistent Integer priority,//优先级 String correlationId, String replyTo,//反馈队列 String expiration,//expiration到期时间 String messageId, Date timestamp, String type, String userId, String appId, String clusterId)
mandatory:当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
immediate:当immediate标志位设置为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
2.事务机制
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
channel.txSelect() ... channel.txCommit() ... channel.txRollback()
3.Confirm机制
事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。可以mandatory配合实现消息的发送可靠性。
// confirm 异步机制 通过注册listener,实现异步ack,提高性能 channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //失败重发 } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { //确认ok } });
//confirm 同步机制 if(channel.waitForConfirms(timeout)){ //确认ok }else{ //失败从发 }
3、消息接收
1.autoAck
为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者。默认是开启的,在消费者端通过下面的方式开启消息确认, 首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认.
boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); ... QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); //确认消息,已经收到 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
2.公平调度
让每个消费者在同一时刻会分配一个任务。 通过channel.basicQos(prefetchCount);可以设置。
3.exclusive
和queue一样,设置了true,只有第一个启动的消费者可用。
channel.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer)
相关文章推荐
- [RabbitMQ]09_RabbitMQ学习之消息可靠性及特性
- RabbitMQ学习之消息可靠性及特性
- RabbitMQ学习之集群消息可靠性测试
- RabbitMQ学习之集群消息可靠性测试
- RabbitMQ学习之spring整合发送异步消息(注解实现)
- RabbitMQ学习之基于spring-rabbitmq的消息异步发送
- RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)
- RabbitMQ (消息队列)专题学习04 Publish/Subscribe(发布者/订阅者)
- 学习之路之三十:利用消息机制自定义数据库回滚特性
- RabbitMQ学习总结(6)——消息的路由分发机制详解
- RabbitMQ学习之spring整合发送异步消息
- rabbitmq学习10:使用spring-ampq发送消息及异步接受消息
- RabbitMQ学习总结(6)——消息的路由分发机制详解
- apache Storm学习之三-消息可靠性
- 利用Python学习RabbitMQ消息队列
- RabbitMQ(python实现)学习之二:Producer发送消息至多个消息队列queue(广播消息)
- RabbitMQ (消息队列)专题学习02 Hello World
- rabbitmq学习9:使用spring-amqp发送消息及同步接受消息
- RabbitMQ (消息队列)专题学习06 Topic
- RabbitMQ(python实现)学习之三:Routing(接收端接收固定类型消息)