您的位置:首页 > 其它

RabbitMQ 消息轮询和消息确认机制

2017-03-08 17:52 417 查看
1 消息轮询:默认的情况下,服务器会将消息发送给下一个消费者,使得每个消费者获得消息数量尽量相同。

2 消息确认机制:默认手动确认,需要消费者反馈给服务器一个确认信号,告知服务器可以在消息队列中删除消息,如果服务器未收到确认信号若消费者断开链接,将会把消息发送给下一个消费者,使得消息不会丢失。自动确认,服务器端会在消费者取得消息后在队列中删除消息,若此时消费者未处理完消息宕掉了,消息会丢失。如果手动确认忘记反馈确认信号,则消息在服务器占用的内存越来越多,应该记得反馈确认信号给服务器;

生产者:

package queue;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class SendNewTask {
private final static String QUEUE_NAME = "newTask";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String[] msg={"hello...word...","hash...map..."};
String message = getMessage(msg);

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '" + message + "'");

channel.close();
connection.close();

}

private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}


消费者 1
package queue;

import java.io.IOException;
import java.util.Calendar;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class ReqvWork {
private final static String QUEUE_NAME = "newTask";

public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel); //QueueingConsumer缓存从服务器发来的消息

//第二个参数true,自动确认机制,如果消息消费者和服务器断开链接会丢失消息;false 关闭自动确认机制,手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //在另一个来自服务器的消息到来之前它会一直阻塞着
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x] Received '" + message + "'");
System.out.println(" [x] done");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //手动消息确认,反馈确认信息
}

}

private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}

}


消费者2
package queue;

import java.io.IOException;
import java.util.Calendar;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class ReqvWork1 {
private final static String QUEUE_NAME = "newTask";

public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel); //QueueingConsumer缓存从服务器发来的消息

//第二个参数true,自动确认机制,如果消息消费者和服务器断开链接会丢失消息;false 关闭自动确认机制,手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //在另一个来自服务器的消息到来之前它会一直阻塞着
String message = new String(delivery.getBody());
doWork(message);
System.out.println(" [x1] Received '" + message + "'");
System.out.println(" [x1] done");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //手动消息确认
}

}

private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: