您的位置:首页 > 其它

RabbitMQ使用场景练习:消息确认机制(十一)

2016-12-22 15:32 453 查看
消息确认机制
RabbitMQ提供了transaction、confirm两种消息确认机制。transaction即事务机制,手动提交和回滚;confirm机制提供了Confirmlistener和waitForConfirms两种方式。confirm机制效率明显会高于transaction机制,但后者的优势在于强一致性。如果没有特别的要求,建议使用conrim机制。 

1、从实验来看,消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。 
2、而为了确认消息是否被发送给queue,应该在发送消息中启用参数mandatory=true,使用ReturnListener接收未被发送成功的消息。 
3、接下来就需要确认消息是否被有效消费。publisher端目前并没有提供监听事件,但提供了应答机制来保证消息被成功消费,应答方式: 
   basicAck:成功消费,消息从队列中删除 
   basicNack:requeue=true,消息重新进入队列,false被删除 
   basicReject:等同于basicNack 
   basicRecover:消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer 

应答模式之transaction机制
需要手动提交和回滚,执行txCommit,消息才会转发给队列进入ready状态;执行txRollback,消息被取消 

Java代码  


package com.demo.mq.rabbitmq.example11;  

import java.io.IOException;  

import java.io.Serializable;  

import org.apache.commons.lang3.SerializationUtils;  

import com.demo.mq.rabbitmq.MqManager;  

import com.rabbitmq.client.AMQP;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

import com.rabbitmq.client.DefaultConsumer;  

import com.rabbitmq.client.Envelope;  

  

/** 

 * 应答模式之transaction机制 

 * @author sheungxin 

 * 

 */  

public class TxDemo {  

    private static String exchange_name="";  

    private static String queue_name="tx_queue";  

      

    /** 

     * transaction机制发送消息,事务机制:手动提交和回滚 

     * 执行txCommit,消息才会转发给队列进入ready状态 

     * 执行txRollback,消息被取消 

     * @param mes 

     * @throws Exception 

     */  

    public static void txSend(Serializable mes) throws Exception{  

        Connection conn=MqManager.newConnection();  

        Channel channel=conn.createChannel();  

        //开启transaction机制  

        channel.txSelect();  

        channel.queueDeclare(queue_name,false,false,true,null);  

        for(int i=0;i<10;i++){  

            try{  

                channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i));  

                //do something  

//              int n=5/0;//试验消息回滚  

                channel.txCommit();//提交消息  

                System.out.println("发布消息"+mes.toString()+i);  

            }catch(Exception e){  

                channel.txRollback();//异常,取消消息  

                System.out.println("回滚消息"+mes.toString()+i);  

            }  

        }  

    }  

  

    /** 

     * transaction机制接收消息,事务机制:手动提交和回滚 

     * 消费者需要执行basicAck,并txCommit(自动应答模式自动处理,本例中采用手动应答模式) 

     * @throws Exception 

     */  

    public static void txRecv() throws Exception{  

        Connection conn=MqManager.newConnection();  

        Channel channel=conn.createChannel();  

        //开启transaction机制  

        channel.txSelect();  

        channel.queueDeclare(queue_name,false,false,true,null);  

        //关闭自动应答模式(自动应答模式不需要ack、txCommit),需要手动basicAck,并执行txCommit  

        channel.basicConsume(queue_name, false, new DefaultConsumer(channel){  

            @Override  

            public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{  

                String mes=SerializationUtils.deserialize(body);  

                System.out.println("tx Received :'"+mes+"' done");  

                channel.basicAck(envelope.getDeliveryTag(), false);  

                channel.txCommit();  

                  

            }  

        });  

    }  

      

    public static void main(String[] args) throws Exception {  

        txSend("hello world!");  

        txRecv();  

    }  

}  

应答模式之confirm机制
1、确认publisher发送消息到broker,由broker进行应答(不能确认是否被有效消费) 
2、confirmSelect,进入confirm消息确认模式,确认方式:1、异步ConfirmListener;2、同步waitForConfirms 
3、ConfirmListener、waitForConfirms均需要配合confirm机制使用 
4、暂时未弄明白confirm机制在consumer的应用,ConfirmListener在consumer中无效 
5、basicNack、basicReject:参数requeue=true时,消息会重新进入队列 
6、autoDelete队列在消费者关闭后不管是否还有未处理的消息都会关闭掉 

Java代码  


package com.demo.mq.rabbitmq.example11;  

import java.io.IOException;  

import java.io.Serializable;  

import org.apache.commons.lang3.SerializationUtils;  

import com.demo.mq.rabbitmq.MqManager;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.ConfirmListener;  

import com.rabbitmq.client.Connection;  

  

/** 

 * 应答模式之confirm机制:消息发送 

 * @author sheungxin 

 * 

 */  

public class ConfirmSend {  

    private static String exchange_name="";  

    private static String queue_name="tx_queue";  

      

    /** 

     * confirm机制:确认publisher发送消息到broker,由broker进行应答(不能确认是否被有效消费) 

     * confirmSelect,进入confirm消息确认模式,确认方式:1、异步ConfirmListener;2、同步waitForConfirms 

     * ConfirmListener、waitForConfirms均需要配合confirm机制使用 

     * @param mes 

     * @throws Exception 

     */  

    public static void txSend(Serializable mes) throws Exception{  

        Connection conn=MqManager.newConnection();  

        Channel channel=conn.createChannel();  

        //开启transaction机制  

        channel.confirmSelect();  

        channel.queueDeclare(queue_name,false,false,true,null);  

        //异步实现发送消息的确认(此部分的消息确认是指发送消息到队列,并非确认消息的有效消费)  

        channel.addConfirmListener(new ConfirmListener() {  

              

            @Override  

            public void handleNack(long deliveryTag, boolean multiple)  

                    throws IOException {  

                //multiple:测试发现multiple随机true或false,原因未知  

                System.out.println("Nack deliveryTag:"+deliveryTag+",multiple:"+multiple);  

            }  

              

            @Override  

            public void handleAck(long deliveryTag, boolean multiple)  

                    throws IOException {  

                System.out.println("Ack deliveryTag:"+deliveryTag+",multiple:"+multiple);  

            }  

        });  

        for(int i=0;i<10;i++){  

            channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i));  

        }  

//      channel.waitForConfirms();//同步实现发送消息的确认  

        System.out.println("-----------");  

        channel.close();  

        conn.close();  

    }  

      

    public static void main(String[] args) throws Exception {  

        txSend("hello world!");  

    }  

}  

Java代码  


package com.demo.mq.rabbitmq.example11;  

import java.io.IOException;  

import org.apache.commons.lang3.SerializationUtils;  

import com.demo.mq.rabbitmq.MqManager;  

import com.rabbitmq.client.AMQP;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

import com.rabbitmq.client.DefaultConsumer;  

import com.rabbitmq.client.Envelope;  

  

/** 

 * 应答模式之confirm机制:消息接收 

 * @author sheungxin 

 * 

 */  

public class ConfirmRecv {  

    private static String queue_name="tx_queue";  

  

    /** 

     * confirm机制:暂时未弄明白confirm机制在consumer的应用,ConfirmListener在consumer中无效 

     * basicNack、basicReject:参数requeue=true时,消息会重新进入队列 

     * autoDelete队列在消费者关闭后不管是否还有未处理的消息都会关闭掉 

     * @throws Exception 

     */  

    public static void txRecv() throws Exception{  

        Connection conn=MqManager.newConnection();  

        Channel channel=conn.createChannel();  

        //开启transaction机制  

//      channel.confirmSelect();  

        //autoDelete,true只要被消息  

        channel.queueDeclare(queue_name,false,false,true,null);  

        //关闭自动应答模式  

        channel.basicConsume(queue_name, false, new DefaultConsumer(channel){  

            @Override  

            public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{  

                String mes=SerializationUtils.deserialize(body);  

                //multiple批量提交,true提交小于参数中tag消息  

                long n=envelope.getDeliveryTag()%3;  

                if(n==0){  

                    channel.basicAck(envelope.getDeliveryTag(), false);  

                }else if(n==1){  

                    //requeue,true重新进入队列  

                    channel.basicNack(envelope.getDeliveryTag(), false, true);  

                }else{  

                    //requeue,true重新进入队列,与basicNack差异缺少multiple参数  

                    channel.basicReject(envelope.getDeliveryTag(), true);  

                }  

                try {  

                    Thread.sleep(2*1000);  

                } catch (InterruptedException e) {  

                    e.printStackTrace();  

                }  

                System.out.println((n==0?"Ack":n==1?"Nack":"Reject")+" mes :'"+mes+"' done");  

            }  

        });  

    }  

      

    public static void main(String[] args) throws Exception {  

        txRecv();  

    }  

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: