RabbitMQ 消息轮询和消息确认机制
2017-03-08 17:52
417 查看
1 消息轮询:默认的情况下,服务器会将消息发送给下一个消费者,使得每个消费者获得消息数量尽量相同。
2 消息确认机制:默认手动确认,需要消费者反馈给服务器一个确认信号,告知服务器可以在消息队列中删除消息,如果服务器未收到确认信号若消费者断开链接,将会把消息发送给下一个消费者,使得消息不会丢失。自动确认,服务器端会在消费者取得消息后在队列中删除消息,若此时消费者未处理完消息宕掉了,消息会丢失。如果手动确认忘记反馈确认信号,则消息在服务器占用的内存越来越多,应该记得反馈确认信号给服务器;
生产者:
package queue;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class SendNewTask {
private final static String QUEUE_NAME = "newTask";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String[] msg={"hello...word...","hash...map..."};
String message = getMessage(msg);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '" + message + "'");
channel.close();
connection.close();
}
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
消费者 1
package queue;
import java.io.IOException;
import java.util.Calendar;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class ReqvWork {
private final static String QUEUE_NAME = "newTask";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel); //QueueingConsumer缓存从服务器发来的消息
//第二个参数true,自动确认机制,如果消息消费者和服务器断开链接会丢失消息;false 关闭自动确认机制,手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //在另一个来自服务器的消息到来之前它会一直阻塞着
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x] Received '" + message + "'");
System.out.println(" [x] done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //手动消息确认,反馈确认信息
}
}
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}
消费者2
package queue;
import java.io.IOException;
import java.util.Calendar;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class ReqvWork1 {
private final static String QUEUE_NAME = "newTask";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel); //QueueingConsumer缓存从服务器发来的消息
//第二个参数true,自动确认机制,如果消息消费者和服务器断开链接会丢失消息;false 关闭自动确认机制,手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //在另一个来自服务器的消息到来之前它会一直阻塞着
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x1] Received '" + message + "'");
System.out.println(" [x1] done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //手动消息确认
}
}
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}
2 消息确认机制:默认手动确认,需要消费者反馈给服务器一个确认信号,告知服务器可以在消息队列中删除消息,如果服务器未收到确认信号若消费者断开链接,将会把消息发送给下一个消费者,使得消息不会丢失。自动确认,服务器端会在消费者取得消息后在队列中删除消息,若此时消费者未处理完消息宕掉了,消息会丢失。如果手动确认忘记反馈确认信号,则消息在服务器占用的内存越来越多,应该记得反馈确认信号给服务器;
生产者:
package queue;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class SendNewTask {
private final static String QUEUE_NAME = "newTask";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String[] msg={"hello...word...","hash...map..."};
String message = getMessage(msg);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '" + message + "'");
channel.close();
connection.close();
}
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
消费者 1
package queue;
import java.io.IOException;
import java.util.Calendar;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class ReqvWork {
private final static String QUEUE_NAME = "newTask";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel); //QueueingConsumer缓存从服务器发来的消息
//第二个参数true,自动确认机制,如果消息消费者和服务器断开链接会丢失消息;false 关闭自动确认机制,手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //在另一个来自服务器的消息到来之前它会一直阻塞着
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x] Received '" + message + "'");
System.out.println(" [x] done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //手动消息确认,反馈确认信息
}
}
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}
消费者2
package queue;
import java.io.IOException;
import java.util.Calendar;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class ReqvWork1 {
private final static String QUEUE_NAME = "newTask";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel); //QueueingConsumer缓存从服务器发来的消息
//第二个参数true,自动确认机制,如果消息消费者和服务器断开链接会丢失消息;false 关闭自动确认机制,手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //在另一个来自服务器的消息到来之前它会一直阻塞着
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x1] Received '" + message + "'");
System.out.println(" [x1] done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //手动消息确认
}
}
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}
相关文章推荐
- RabbitMQ 消息轮询和消息确认机制
- RabbitMQ之消息确认机制(事务+Confirm)
- RabbitMQ(三)消息确认机制(事务+Confirm)
- RabbitMQ使用场景练习:消息确认机制(十一)
- RabbitMQ消息队列(九):Publisher的消息确认机制
- RabbitMQ消息队列(九):Publisher的消息确认机制
- RabbitMQ - Publisher的消息确认机制
- (六)RabbitMQ消息队列-消息任务分发与消息ACK确认机制(PHP版)
- RabbitMQ使用场景练习:消息确认机制(十一)
- RabbitMQ消息队列(九):Publisher的消息确认机制
- RabbitMQ消息队列(九):Publisher的消息确认机制
- RabbitMQ消息队列(九):Publisher的消息确认机制
- RabbitMQ之消息确认机制(事务+Confirm)
- (六)RabbitMQ消息队列-消息任务分发与消息ACK确认机制(PHP版)
- RabbitMQ---9、消息确认机制(事务+Confirm)
- RabbitMQ消息队列(九):Publisher的消息确认机制
- (转)RabbitMQ消息队列(九):Publisher的消息确认机制
- RabbitMQ消息确认机制
- RabbitMQ消息队列(九):Publisher的消息确认机制
- 最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制。 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订