RabbitMQ Java官方教程(二)----Work Queues
2018-01-19 11:52
351 查看
RabbitMQ Java官方教程(二)----Work Queues
工作队列(使用Java客户端)
在第一篇教学中,我们写了从一个指定的对列发送和接收消息的程序。在本文中,我们将创建一个工作队列(Work
Queues),用于将耗时的任务分配给多个工作人员。
工作队列(又叫任务队列)的主要目的是想要避免在执行大量而密集的资源任务时,必须立即执行并且必须等待任务完成后才能继续其他事情的情况。针对这种情况,我们会把任务放在以后做。我们将任务封装成一个消息并将其发送到一个队列。在后台运行的工作进程会取出任务并最终执行任务。当你运行很多“员工”时,任务将在他们中间共享。
这个概念在web应用程序中特别有用,因为在普通的HTTP请求窗口中无法处理复杂的任务。
准备工作
在本教程的前一部分中,我们发送了一个包含“Hello World!”的消息。现在我们将发送更为复杂的字符串。我们没有实际的任务,比如要调整大小的图像或要呈现的pdf文件,所以让我们虚拟我们很忙——通过使用Thread.sleep()函数来虚拟它。我们将把字符串中的点数作为他的复杂度:每个点都将占一秒钟的“工作”。例如,虚拟任务“Hello...”将会占用三秒的时间。我们将稍微的修改一下之前的程序代码(Send.java),让他可以发送任意的消息。这个程序将把任务安排到我们的工作队列中,所以让我们把它命名为NewTask.java:
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
写一些帮助获得信息的代码:
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
我们之前的Recv.java程序也需要做一些修改:它要为消息体中的每个点伪造一个一秒钟的工作。它将处理传递过来的消息并执行任务,所以我们将它命名为Worker.java:
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
模拟任务的执行时间:
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
其实现在就可以运行并看到效果了,运行多个worker,你会发现当运行NewTask的时候,RabbitMQ
会默认的依次将每个消息发送个下一个使用者。平均每个消费者将的到相同数量的消息。(例如,有两个worker那么他们将会交替循环接收到每条下发的消息)
源码放在最后,我们先看一下其余的东西。
回复确认消息
完成一项任务可能需要几秒钟。你可能会想,如果其中一个消费者开始执行了一个需要长时间执行的任务,但只完成了部分就挂掉了怎么办。就目前的代码来看,一旦RabbitMQ向客户端推送一条消息,那这条消息马上会被标记并删除掉。在这种情况下,如果你挂掉一个worker,那么我们将失去他所执行那条消息,我们也将丢失所有发送给这个worker但还没来得及处理的消息。
但我们不想丢失任何任务。如果一个worker不工作了 ,我们希望把任务递交给另外一个worker执行。
为了确保信息永远不会丢失。RabbitMQ支持消息应答(将会在后面的教程详细介绍)。消费者返回一个确认应答,告诉RabbitMQ一个特定的消息已经被接收、处理,可以随便删除掉了。
如果一个消费者挂掉了(对应的通道关闭了、连接被关闭了或者TCP连接丢失等)导致没有发送确认回复。那么RabbitMQ就会知道,这条消息没有被执行完,并会将这条消息重新排入队列。如果此时有其他的消费者在线,它会迅速的将其转发给另一个消费者。这样你就可以确保没有消息会丢失了,就算workers偶尔会挂掉。
这里没有什么消息失效时间的限制。当消费者挂了的时候,RabbitMQ将会重新发送消息。这样的话,就算是处理消息需要非常非常长的时间也没有关系。
默认情况下消息确认回复是打开的。在之前的例子中,我们通过autoAck = true标记将其关闭了。现在让我们将其设置成false。这样的话,一旦任务完成,worker将会回复一条确认消息。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } };boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
使用这个你就不用担心在执行任务试你关闭了一个worker会丢失信息了。当worker挂掉后,很快的,所有的未确认消息都将被重新发送。
忘记回复确认消息
未回复最基本的确认消息是很常见的错误。这错误很简单,但它的影响却很严重。
当你的客户端退出时,消息将被重新发送(这个看起来像是随机的),但RabbitMQ将会吃掉越来越多的内存,因为它无法释放任何未回复确认信息的消息。
为了调试这种错误,你可以使用rabbirmqctl去打印出messages_unacknowledged文件:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经学会了就算消费者挂掉了,也可以确保任务不会丢失的方法。但如果RabbirMQ服务器停止了我们的任务还是会丢失啊!
如果RabbitMQ退出或者崩溃的话,它将会清掉所有的队列和消息。除非。。。你告诉他不要。需要两件事儿来确保消息不
d4e3
会丢失:我们要将队列和消息都标记为持久的。
首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了这样做,我们需要声明它是持久的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
这条指令本身是正确的,但它在我们目前的设置中是不起作用的。这是因为我们已经定义了一个叫做“hello”的队列,它不是持久的。RabbitMQ不允许你用不同的参数去重新定义一个已经存在的队列。如果你试图这么做,它会报错。这里有一个快速的解决方案——让我们用不同的名称来声明一个队列,比如说“task_queue”:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
这里的“queueDeclare”需要在生产者和消费者的代码上同时修改。
此时,我们就可以确保就算RabbitMQ重启,“task_queue”队列也不会丢失。接下来我们要去将消息标记为持久的——通过将MessageProperties(实现了Basicproperties接口)的值设置为“PERSISTENT_TEXT_PLAIN”。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
需要注意的地方
标记消息为持久的并不能完全的确保消息不会丢失。尽管这指示了RabbitMQ将消息存到磁盘,但RabbitMQ接收到消息到保存消息的中间还是有一个短的时间段。而且,RabbitMQ不是将每条消息都存入磁盘的,它可能只是将其保存在缓存而不是真的写入磁盘。持久性保证功能并不强大,但对于我们简单的任务队列来说是足够用了的。如果你需要更强大的保证,你可以使用发布者确认模式(publisher
confirms,后面有机会会具体讲解)
平均分配
你可能注意到了,任务的分派仍然是不能完全按照我们的安排工作。举个例子,试想这里有两个“worker”,当消息特别扯淡的时候,比如说特别繁重或者特别轻松。然后恰巧是一个“worker”不断的忙碌着,而另一个却激活不用做什么工作。这里,RabbitMQ并不会知道这种情况,它只是依然均匀的分配任务。可能就是繁重的依然分配给了同一个“worker”,而另一个一直被分配到轻松的,这显然不是我们想要的。
这种情况的发生是因为RabbitMQ只是当消息进入到队列中后就分发,而不考虑消费者有多少条消息没有返回确认完成的回复了。它就只是盲目的将第n个消息推送给第n个消费者。
为了杜绝这种情况,我们可以使用basicQos方法,并设置参数prefetchCount=1。它告诉RabbitMQ不要给一个“worker”一次发送一条以上的消息。或者换句话说,在收到前一条消息的确认完成回复之前,不会给“worker”发送新的消息
。取而代之的是,把待处理的消息发送给其他的不忙的“worker”:
int prefetchCount = 1; channel.basicQos(prefetchCount);
这里要说一下,要注意队列的大小。就是说,如果所有的“worker”都是忙碌的,那么你的队列可能会被填满。你需要关注这个问题以便处理,可以增加更多的“worker”或者采取一些其他的策略。
整合代码
好了,说了这么多,下面我们把上面的零散碎片整合到一起!
最终的NewTask.java Class:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
(NewTask.java源码)
然后是我们的Worker.java:
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
(Worker.java源码)
使用确认完成回复和prefetchCount你可以设置一个工作队列。使用持久化设置可以使消息得以保存,即使是RabbitMQ重新启动。
有关通道(Channel)方法和消息属性(MessageProperties)的更多信息,可以在线浏览JavaDocs。
Worker Queues教程到这里就结束了,接下来我们会学习第三部分教程,学习如何向多个消费者发送同样的消息。
相关文章推荐
- RabbitMQ Java官方教程(五)----Topics
- RabbitMQ入门教程 For Java【2】 - Work Queues
- RabbitMQ官方中文入门教程(PHP版) 第二部分:工作队列(Work queues)
- RabbitMQ官方中文入门教程(PHP版) 第二部分:工作队列(Work queues)
- RabbitMQ入门教程 For Java【2】 - Work Queues
- RabbitMQ Java官方教程(四)----Routing
- RabbitMQ Java官方教程(一)----Hello World
- RabbitMQ官方中文入门教程(PHP版) 第二部分:工作队列(Work queues)
- RabbitMQ Java官方教程(三)----Publish/Subscribe
- RabbitMQ官方教程之二 “工作队列”
- PHP版 RabbitMQ官方中文入门教程
- 智渔课堂官方免费教程四十二 :Java流之字节流 输入流和输出流 InputStream和OutputStream
- java 并发官方教程
- java实现rabbitMQ延时队列详解以及spring-rabbit整合教程
- RabbitMQ 官方NET教程(五)【Topic】
- RabbitMQ入门教程 For Java【9】 - 与Spring集成
- RabbitMQ实例教程:Hello RabbitMQ World之Java实现
- RabbitMQ实例教程:用Java搞定工作队列
- 智渔课堂官方免费教程十四:Java面向对象三大特性之继承
- Java并发教程(Oracle官方资料)