您的位置:首页 > 其它

ActiveMQ的可靠性机制的相关概念介绍

2016-11-01 13:27 183 查看
1、确认JMS消息

        只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。 在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

         

          • Session.AUTO_ACKNOWLEDGE。

                   当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。

         • Session.CLIENT_ACKNOWLEDGE。

                   客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。

1、生产者,设置签收模式为Session.CLIENT_ACKNOWLEDGE
// 通过Connection对象创建Session会话(上下文环境对象),
// 参数一,表示是否开启事务
// 参数二,表示的是签收模式,一般使用的有自动签收和客户端自己确认签收
Session session = connection.createSession(Boolean.FALSE,
Session.CLIENT_ACKNOWLEDGE);

2、消费者,设置签收模式为Session.CLIENT_ACKNOWLEDGE
// 通过Connection对象创建Session会话(上下文环境对象),
// 参数一,表示是否开启事务
// 参数二,表示的是签收模式,一般使用的有自动签收和客户端自己确认签收
Session session = connection.createSession(Boolean.FALSE,
Session.CLIENT_ACKNOWLEDGE);

3、消费者,在消费消息的时候,返回一个确认
// 使用Session来创建消息对象的生产者或者消费者
MessageConsumer createConsumer = session.createConsumer(destination);
while (true) {
TextMessage textMessage = (TextMessage) createConsumer.receive();
if (textMessage == null)
break;
// 客户端的签收模式,
textMessage.acknowledge();
System.out.println("收到的内容为" + textMessage.getText());
}


         • Session.DUPS_ACKNOWLEDGE。

                   该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS Provider必须把消息头的JMSRedelivered字段设置为true。

2、持久化

      JMS 支持以下两种消息提交模式:

            • PERSISTENT。

                       指示JMS Provider持久保存消息,以保证消息不会因为JMS Provider的失败而丢失。

            • NON_PERSISTENT。

                       不要求JMS Provider持久保存消息。

3、优先级

         可以使用消息优先级来指示JMS Provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。

4、消息过期

        可以设置消息在一定时间后过期,默认是永不过期。

#在生产者发送消息的过程中,可以指定deliveryMode传输模式,priority消息优先级,timeToLive消息过期时间

void send(Message message, int deliveryMode, int priority, long timeToLive );
         其中,deliveryMode为传输模式,priority为消息优先级,timeToLive为消息过期时间。

         ActiveMQ支持两种消息的传送模式,PERSISTENT和NON_PERSISTENT两种。如果不指定传输模式,默认的就是持久化消息。如果容忍消息丢失,那么可以使用非持久化的消息机制以改善性能和减少开销。

         消息优先级从0-9十个级别,0-4是普通消息,5-9是加急消息。如果不指定优先级,默认为4.JMS不要求严格按照这十个优先级发送消息,但必须保证加急消息要优先于普通消息到达。

         默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间,单位为毫秒。

// 5、使用Session来创建消息对象的生产者或者消费者
MessageProducer messageProducer = session.createProducer(null);

// 6、最后使用JMS规范的TextMessage形式创建数据(通过Session对象)
// 并利用MessageProducer的send方法发送数据
for (int i = 0; i < 5; i++) {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是消息" + i);

// 7、在发送消息的时候,指定消息的目的地,持久化方式,优先级,以及消息的过期时间

messageProducer.send(destination, textMessage,
DeliveryMode.NON_PERSISTENT, i, 1000 * 60);
}


5、临时目的地

         可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。

6、持久订阅

        首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是一个topic,第二个参数是订阅的名称。 JMS Provider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、相同的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。JMS
Provider会象客户发送客户处于非激活状态时所发布的消息。 持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。

7、本地事务

         在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。 事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。 需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。
需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息