您的位置:首页 > 产品设计 > UI/UE

RabbitMQ Java官方教程(二)----Work Queues

2018-01-19 11:52 351 查看

RabbitMQ Java官方教程(二)----Work Queues

工作队列
(使用Java客户端)


              


在第一篇教学中,我们写了从一个指定的对列发送和接收消息的程序。在本文中,我们将创建一个工作队列(Work
Queues),用于将耗时的任务分配给多个工作人员。

工作队列(又叫任务队列)的主要目的是想要避免在执行大量而密集的资源任务时,必须立即执行并且必须等待任务完成后才能继续其他事情的情况。针对这种情况,我们会把任务放在以后做。我们将任务封装成一个消息并将其发送到一个队列。在后台运行的工作进程会取出任务并最终执行任务。当你运行很多“员工”时,任务将在他们中间共享。

这个概念在web应用程序中特别有用,因为在普通的HTTP请求窗口中无法处理复杂的任务。

 

准备工作

在本教程的前一部分中,我们发送了一个包含“Hello World!”的消息。现在我们将发送更为复杂的字符串。我们没有实际的任务,比如要调整大小的图像或要呈现的pdf文件,所以让我们虚拟我们很忙——通过使用Thread.sleep()函数来虚拟它。我们将把字符串中的点数作为他的复杂度:每个点都将占一秒钟的“工作”。例如,虚拟任务“Hello...”将会占用三秒的时间。我们将稍微的修改一下之前的程序代码(Send.java),让他可以发送任意的消息。这个程序将把任务安排到我们的工作队列中,所以让我们把它命名为NewTask.java:

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");


写一些帮助获得信息的代码:

private static String getMessage(String[] strings){

if (strings.length < 1)

return "Hello World!";

return joinStrings(strings, " ");

}

private static String joinStrings(String[] strings, String delimiter) {

int length = strings.length;

if (length == 0) return "";

StringBuilder words = new StringBuilder(strings[0]);

for (int i = 1; i < length; i++) {

words.append(delimiter).append(strings[i]);

}

return words.toString();

}


 我们之前的Recv.java程序也需要做一些修改:它要为消息体中的每个点伪造一个一秒钟的工作。它将处理传递过来的消息并执行任务,所以我们将它命名为Worker.java:

final Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

try {

doWork(message);

} finally {

System.out.println(" [x] Done");

}

}

};

boolean autoAck = true; // acknowledgment is covered below

channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);


模拟任务的执行时间:

private static void doWork(String task) throws InterruptedException {

for (char ch: task.toCharArray()) {

if (ch == '.') Thread.sleep(1000);

}

}


 其实现在就可以运行并看到效果了,运行多个worker,你会发现当运行NewTask的时候,RabbitMQ
会默认的依次将每个消息发送个下一个使用者。平均每个消费者将的到相同数量的消息。(例如,有两个worker那么他们将会交替循环接收到每条下发的消息)

源码放在最后,我们先看一下其余的东西。

回复确认消息

完成一项任务可能需要几秒钟。你可能会想,如果其中一个消费者开始执行了一个需要长时间执行的任务,但只完成了部分就挂掉了怎么办。就目前的代码来看,一旦RabbitMQ向客户端推送一条消息,那这条消息马上会被标记并删除掉。在这种情况下,如果你挂掉一个worker,那么我们将失去他所执行那条消息,我们也将丢失所有发送给这个worker但还没来得及处理的消息。

但我们不想丢失任何任务。如果一个worker不工作了 ,我们希望把任务递交给另外一个worker执行。

为了确保信息永远不会丢失。RabbitMQ支持消息应答(将会在后面的教程详细介绍)。消费者返回一个确认应答,告诉RabbitMQ一个特定的消息已经被接收、处理,可以随便删除掉了。

如果一个消费者挂掉了(对应的通道关闭了、连接被关闭了或者TCP连接丢失等)导致没有发送确认回复。那么RabbitMQ就会知道,这条消息没有被执行完,并会将这条消息重新排入队列。如果此时有其他的消费者在线,它会迅速的将其转发给另一个消费者。这样你就可以确保没有消息会丢失了,就算workers偶尔会挂掉。

这里没有什么消息失效时间的限制。当消费者挂了的时候,RabbitMQ将会重新发送消息。这样的话,就算是处理消息需要非常非常长的时间也没有关系。

默认情况下消息确认回复是打开的。在之前的例子中,我们通过autoAck = true标记将其关闭了。现在让我们将其设置成false。这样的话,一旦任务完成,worker将会回复一条确认消息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

try {

doWork(message);

} finally {

System.out.println(" [x] Done");

channel.basicAck(envelope.getDeliveryTag(), false);

}

}

};boolean autoAck = false;

channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);


使用这个你就不用担心在执行任务试你关闭了一个worker会丢失信息了。当worker挂掉后,很快的,所有的未确认消息都将被重新发送。

 

忘记回复确认消息

未回复最基本的确认消息是很常见的错误。这错误很简单,但它的影响却很严重。

当你的客户端退出时,消息将被重新发送(这个看起来像是随机的),但RabbitMQ将会吃掉越来越多的内存,因为它无法释放任何未回复确认信息的消息。

为了调试这种错误,你可以使用rabbirmqctl去打印出messages_unacknowledged文件:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化

我们已经学会了就算消费者挂掉了,也可以确保任务不会丢失的方法。但如果RabbirMQ服务器停止了我们的任务还是会丢失啊!

如果RabbitMQ退出或者崩溃的话,它将会清掉所有的队列和消息。除非。。。你告诉他不要。需要两件事儿来确保消息不
d4e3
会丢失:我们要将队列和消息都标记为持久的。

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了这样做,我们需要声明它是持久的:

boolean durable = true;

channel.queueDeclare("hello", durable, false, false, null);


这条指令本身是正确的,但它在我们目前的设置中是不起作用的。这是因为我们已经定义了一个叫做“hello”的队列,它不是持久的。RabbitMQ不允许你用不同的参数去重新定义一个已经存在的队列。如果你试图这么做,它会报错。这里有一个快速的解决方案——让我们用不同的名称来声明一个队列,比如说“task_queue”:

boolean durable = true;

channel.queueDeclare("task_queue", durable, false, false, null);


这里的“queueDeclare”需要在生产者和消费者的代码上同时修改。

此时,我们就可以确保就算RabbitMQ重启,“task_queue”队列也不会丢失。接下来我们要去将消息标记为持久的——通过将MessageProperties(实现了Basicproperties接口)的值设置为“PERSISTENT_TEXT_PLAIN”。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",

MessageProperties.PERSISTENT_TEXT_PLAIN,

message.getBytes());


 

需要注意的地方

标记消息为持久的并不能完全的确保消息不会丢失。尽管这指示了RabbitMQ将消息存到磁盘,但RabbitMQ接收到消息到保存消息的中间还是有一个短的时间段。而且,RabbitMQ不是将每条消息都存入磁盘的,它可能只是将其保存在缓存而不是真的写入磁盘。持久性保证功能并不强大,但对于我们简单的任务队列来说是足够用了的。如果你需要更强大的保证,你可以使用发布者确认模式(publisher
confirms,后面有机会会具体讲解)

 

平均分配

你可能注意到了,任务的分派仍然是不能完全按照我们的安排工作。举个例子,试想这里有两个“worker”,当消息特别扯淡的时候,比如说特别繁重或者特别轻松。然后恰巧是一个“worker”不断的忙碌着,而另一个却激活不用做什么工作。这里,RabbitMQ并不会知道这种情况,它只是依然均匀的分配任务。可能就是繁重的依然分配给了同一个“worker”,而另一个一直被分配到轻松的,这显然不是我们想要的。

这种情况的发生是因为RabbitMQ只是当消息进入到队列中后就分发,而不考虑消费者有多少条消息没有返回确认完成的回复了。它就只是盲目的将第n个消息推送给第n个消费者。

         


为了杜绝这种情况,我们可以使用basicQos方法,并设置参数prefetchCount=1。它告诉RabbitMQ不要给一个“worker”一次发送一条以上的消息。或者换句话说,在收到前一条消息的确认完成回复之前,不会给“worker”发送新的消息
。取而代之的是,把待处理的消息发送给其他的不忙的“worker”:

int prefetchCount = 1;

channel.basicQos(prefetchCount);


这里要说一下,要注意队列的大小。就是说,如果所有的“worker”都是忙碌的,那么你的队列可能会被填满。你需要关注这个问题以便处理,可以增加更多的“worker”或者采取一些其他的策略。

 整合代码

好了,说了这么多,下面我们把上面的零散碎片整合到一起!

最终的NewTask.java Class:

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.MessageProperties;

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String message = getMessage(argv);

channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

channel.close();

connection.close();

}

//...

}


NewTask.java源码

 

然后是我们的Worker.java:

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

final Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

try {

doWork(message);

} finally {

System.out.println(" [x] Done");

channel.basicAck(envelope.getDeliveryTag(), false);

}

}

};

boolean autoAck = false;

channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

}

private static void doWork(String task) {

for (char ch : task.toCharArray()) {

if (ch == '.') {

try {

Thread.sleep(1000);

} catch (InterruptedException _ignored) {

Thread.currentThread().interrupt();

}

}

}

}

}


Worker.java源码

使用确认完成回复和prefetchCount你可以设置一个工作队列。使用持久化设置可以使消息得以保存,即使是RabbitMQ重新启动。

有关通道(Channel)方法和消息属性(MessageProperties)的更多信息,可以在线浏览JavaDocs

 

Worker Queues教程到这里就结束了,接下来我们会学习第三部分教程,学习如何向多个消费者发送同样的消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java RabbitMQ