您的位置:首页 > 其它

[RabbitMQ]09_RabbitMQ学习之消息可靠性及特性

2017-09-03 00:00 1341 查看
摘要: RabbitMQ学习之消息可靠性及特性

下面主要从队列、消息发送、消息接收方面了解消息传递过的一些可靠性处理。
1、队列
消费者是无法订阅或者获取不存在的MessageQueue中信息。消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
声明一个队列
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
durable:声明队列持久化
exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:

其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。

其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。

其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。

这种队列适用于只限于一个客户端发送读取消息的应用场景。
autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
其他选项,channel.queueDeclarePassive:例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
2、发送消息
1). 发送消息设置
channel.basicPublish(exchange, routingKey, mandatory, immediate, basicProperties, body);
basicProperties:通过参数实现消息持久化,MessageProperties.PERSISTENT_TEXT_PLAIN

public BasicProperties(
String contentType,//消息类型如:text/plain
String contentEncoding,//编码
Map<String,Object> headers,
Integer deliveryMode,//1:nonpersistent 2:persistent
Integer priority,//优先级
String correlationId,
String replyTo,//反馈队列
String expiration,//expiration到期时间
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId)


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

mandatory:当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
immediate:当immediate标志位设置为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
2). 事务机制
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会被服务器接收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。

channel.txSelect()
...
channel.txCommit()
...
channel.txRollback()


1

2

3

4

5

1

2

3

4

5

3). Confirm机制
事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息是否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。可以mandatory配合实现消息的发送可靠性。

// confirm 异步机制 通过注册listener,实现异步ack,提高性能
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//失败重发
}

@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//确认ok
}
});


1

2

3

4

5

6

7

8

9

10

11

12

13

1

2

3

4

5

6

7

8

9

10

11

12

13

//confirm 同步机制
if(channel.waitForConfirms(timeout)){
//确认ok
}else{
//失败从发
}


1

2

3

4

5

6

1

2

3

4

5

6

3、消息接收
1). autoAck
为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者。默认是开启的,在消费者端通过下面的方式开启消息确认, 首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认.

boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
...
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
//确认消息,已经收到
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);


1

2

3

4

5

6

7

1

2

3

4

5

6

7

2). 公平调度
让每个消费者在同一时刻会分配一个任务。 通过channel.basicQos(prefetchCount);可以设置。
3). exclusive
和queue一样,设置了true,只有第一个启动的消费者可用。
channel.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RabbitMQ Confirm 事务