RabbitMQ(二)队列与消息的持久化
2011-04-29 10:33
453 查看
当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要做其它的事情,且会消耗很长的时间,在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者宕掉了,这时候就要使用消息接收确认机制,可以让其它的消费者再次执行刚才宕掉的消费者没有完成的事情。另外,在默认情况下,我们创建的消息队列以及存放在队列里面的消息,都是非持久化的,也就是说当RabbitMQ宕掉了或者是重启了,创建的消息队列以及消息都不会保存,为了解决这种情况,保证消息传输的可靠性,我们可以使用RabbitMQ提供的消息队列的持久化机制。
生产者:
1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.MessageProperties;
5 public class ClientSend1 {
6 public static final String queue_name="my_queue";
7 public static final boolean durable=true; //消息队列持久化
8 public static void main(String[] args)
9 throws java.io.IOException{
ConnectionFactory factory=new ConnectionFactory(); //创建连接工厂
factory.setHost("localhost");
factory.setVirtualHost("my_mq");
factory.setUsername("zhxia");
factory.setPassword("123456");
Connection connection=factory.newConnection(); //创建连接
Channel channel=connection.createChannel();//创建信道
channel.queueDeclare(queue_name, durable, false, false, null); //声明消息队列,且为可持久化的
String message="Hello world"+Math.random();
//将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("Send message:"+message);
channel.close();
connection.close();
}
}
说明:
行17 和行20 需要同时设置,也就是将队列设置为持久化之后,还需要将发送的消息也要设置为持久化才能保证队列和消息一直存在
消费者:
1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.QueueingConsumer;
5 public class ClientReceive1 {
6 public static final String queue_name="my_queue";
7 public static final boolean autoAck=false;
8 public static final boolean durable=true;
9 public static void main(String[] args)
throws java.io.IOException,java.lang.InterruptedException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("my_mq");
factory.setUsername("zhxia");
factory.setPassword("123456");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(queue_name, durable, false, false, null);
System.out.println("Wait for message");
channel.basicQos(1); //消息分发处理
QueueingConsumer consumer=new QueueingConsumer(channel);
channel.basicConsume(queue_name, autoAck, consumer);
while(true){
Thread.sleep(500);
QueueingConsumer.Delivery deliver=consumer.nextDelivery();
String message=new String(deliver.getBody());
System.out.println("Message received:"+message);
channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
}
}
}
说明:
行22: 设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完并且已经对刚才处理的消息进行确认之后, 才发送下一条消息,防止消费者太过于忙碌。如下图所示:
生产者:
1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.MessageProperties;
5 public class ClientSend1 {
6 public static final String queue_name="my_queue";
7 public static final boolean durable=true; //消息队列持久化
8 public static void main(String[] args)
9 throws java.io.IOException{
ConnectionFactory factory=new ConnectionFactory(); //创建连接工厂
factory.setHost("localhost");
factory.setVirtualHost("my_mq");
factory.setUsername("zhxia");
factory.setPassword("123456");
Connection connection=factory.newConnection(); //创建连接
Channel channel=connection.createChannel();//创建信道
channel.queueDeclare(queue_name, durable, false, false, null); //声明消息队列,且为可持久化的
String message="Hello world"+Math.random();
//将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("Send message:"+message);
channel.close();
connection.close();
}
}
说明:
行17 和行20 需要同时设置,也就是将队列设置为持久化之后,还需要将发送的消息也要设置为持久化才能保证队列和消息一直存在
消费者:
1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.QueueingConsumer;
5 public class ClientReceive1 {
6 public static final String queue_name="my_queue";
7 public static final boolean autoAck=false;
8 public static final boolean durable=true;
9 public static void main(String[] args)
throws java.io.IOException,java.lang.InterruptedException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("my_mq");
factory.setUsername("zhxia");
factory.setPassword("123456");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(queue_name, durable, false, false, null);
System.out.println("Wait for message");
channel.basicQos(1); //消息分发处理
QueueingConsumer consumer=new QueueingConsumer(channel);
channel.basicConsume(queue_name, autoAck, consumer);
while(true){
Thread.sleep(500);
QueueingConsumer.Delivery deliver=consumer.nextDelivery();
String message=new String(deliver.getBody());
System.out.println("Message received:"+message);
channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
}
}
}
说明:
行22: 设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完并且已经对刚才处理的消息进行确认之后, 才发送下一条消息,防止消费者太过于忙碌。如下图所示:
相关文章推荐
- RabbitMQ(二)队列与消息的持久化
- Java使用Rabbitmq惊喜队列queue和消息内容的本地持久化核心方法。(内容存储在硬盘)
- RabbitMQ队列与消息的持久化
- RabbitMQ之队列与消息持久化
- RabbitMQ系列三 (深入消息队列) 持久化
- RabbitMQ之消息持久化(队列持久化、消息持久化)
- RabbitMQ3.7.2入门到进阶之工作队列消息应答ack与消息持久化durable
- RabbitMQ之队列与消息持久化
- RabbitMQ之消息持久化(队列持久化、消息持久化)
- 消息队列RabbitMQ
- Redis与RabbitMQ作为消息队列的比较
- 学渣讲消息队列之RabbitMQ从敲门到入门(第五讲)—— "Routing"
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)
- 开源稳定的消息队列 RabbitMQ 安装篇
- RabbitMQ消息队列(六):使用主题进行消息分发
- rabbitMQ 消息队列
- (转)消息队列之RabbitMQ-基础概念详细介绍
- 在 CentOS7 上安装 RabbitMQ 消息队列中间件
- RabbitMQ消息队列-Centos7下安装RabbitMQ3.6.1
- RabbitMQ消息队列(九):Publisher的消息确认机制