您的位置:首页 > 其它

RabbitMQ 官方NET教程(二)【工作队列】

2017-06-08 22:10 459 查看
这篇中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务。

工作队列的主要任务是:避免立刻执行资源密集型任务和避免必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被这些工作进程共享执行。

这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。

准备

在本教程的前面部分,我们发送了一个包含
Hello World!
的消息。 现在我们将发送代替复杂任务的字符串。 我们没有一个现实世界的任务,比如图像被调整大小,或者是要渲染的pdf文件,所以假设我们很忙 - 通过使用
Thread.sleep()
函数来假冒它。 我们将把字符串中的点数作为其复杂度; 每个点都将占“work”的一秒钟。 例如,由H
ello...
描述的假任务将需要三秒钟。

我们将稍后从之前的例子中修改
Send
程序,以允许从命令行发送任意消息。 这个程序会将任务安排到我们的工作队列中,所以让我们命名为
NewTask


dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Worker
dotnet add package RabbitMQ.Client
dotnet restore

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);

有些帮助从命令行参数获取消息:

private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

我们的旧的
Receive.cs
脚本还需要一些更改:它需要为消息体中的每个点伪造一秒的工作时间。 它将处理RabbitMQ发送的消息并执行任务,因此我们将其复制到
Worker
项目并修改:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);

int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);

Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);

我们假任务到模拟执行时间:

int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);

循环调度

使用任务队列的优点之一是能够轻松地并行工作。 如果我们正在建立积压的工作,我们可以增加更多的工作者,这样可以轻松扩展。

首先,我们同时尝试运行两个
Worker
实例。 他们都会从队列中获取消息,但是究竟如何? 让我们来看看。

你需要三个控制台打开。 两个将运行
Worker
程序。 这些控制台将是我们两个消费者 - C1和C2。

# shell 1
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C


# shell 2
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C

在第三个我们将发布新的任务。 一旦您已经开始使用消费者,您可以发布一些消息:

# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."

让我们看看送给我们workers的内容:

# shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'

# shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。 平均每个消费者将获得相同数量的消息。 这种分发消息的方式叫做循环(round-robin)。 与三名或更多的workers一起尝试。

消息应答(message acknowledgments)

执行一个任务需要花费几秒钟。你可能会担心当一个消费者在执行任务时发生中断。使用我们当前的代码,一旦RabbitMQ向客户发送消息,它立即将其从内存中删除。在这种情况下,如果杀死正在执行任务的某个工作者,我们会丢失它正在处理的信息。我们也会丢失已经转发给这个工作者且它还未执行的消息。

但是我们不想失去任何任务。如果一个worker挂了,我们希望把这个任务交给另一个工作者。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。从消费者发送一个确认信息告诉RabbitMQ已经收到,处理了特定的消息,然后RabbitMQ可以自由删除它。

如果消费者死机(其通道关闭,连接关闭或TCP连接丢失),而不发送确认信息,RabbitMQ将会明白消息未被完全处理并重新排队。如果同时有其他消费者在线,则会迅速将其重新提供给另一个消费者。这样就可以确保没有消息丢失,即使工作者偶然死亡。

没有任何消息超时; RabbitMQ将在消费者挂了时重新发送消息。如果消费者处理一个信息需要耗费特别特别长的时间是允许的。

消息确认默认情况下打开。 在前面的例子中,我们通过将
noAck
(“no manual acks”)参数设置为true来明确地将其关闭。 一旦完成任务,现在该删除这个标志并发送正确的确认。

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);

int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000);

Console.WriteLine(" [x] Done");

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);

使用这个代码,我们可以确定即使在处理消息时,使用
CTRL + C
杀死一个工作者,也不会丢失任何东西。工作者挂了之后不久,所有未确认的消息将被重新发送。


忘记确认

丢失
BasicAck
是一个常见的错误。 这是一个容易的错误,但后果是严重的。

当您的客户端退出(可能看起来像随机重新传递)时,消息将被重新传递,但是RabbitMQ将会消耗越来越多的内存,因为它将无法释放任何未包含的消息。


为了调试这种错误,您可以使用
rabbitmqctl
打印
messages_unacknowledged
字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化(Message durability)

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。 但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要丢失。需要两件事来确保消息不会丢失:我们需要将所有队列和消息标记为持久化。

首先,我们需要确保RabbitMQ不会丢失我们的队列。 为了这样做,我们需要将其声明为持久的:

channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

虽然这个命令本身是正确的,但是在我们目前的设置中是不行的。 这是因为我们已经定义了一个非持久化的名为hello的队列。 RabbitMQ不允许您重新定义具有不同参数的现有队列,并会向尝试执行此操作的任何程序返回错误。 但是有一个快速的解决方法 - 让我们用不同的名称声明一个队列,例如
task_queue


channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

这个
queueDeclare
更改需要应用于生产者和消费者代码。

在这一点上,我们确信,即使RabbitMQ重新启动,
task_queue
队列也不会丢失。 现在我们需要将我们的消息标记为持久性 - 将
IBasicProperties.SetPersistent
设置为
true


var properties = channel.CreateBasicProperties();
properties.Persistent = true;

注意消息持久性

将消息标记为持久性不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且还没有保存时,仍然有一个很短的时间窗口。 此外,RabbitMQ不会对每个消息执行`fsync`(同步内存中所有已修改的文件数据到储存设备) - 它可能只是保存到缓存中,而不是真正写入磁盘。 持久性保证不强,但对我们的简单任务队列来说已经足够了。 如果您需要更强大的保证,那么您可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。

公平转发(Fair dispatch)

或许会发现,目前的消息转发机制(Round-robin)并非是我们想要的。例如,这样一种情况,对于两个消费者,有一系列的任务,奇数任务特别耗时,而偶数任务却很轻松,这样造成一个消费者一直繁忙,另一个消费者却很快执行完任务后等待。

造成这样的原因是因为RabbitMQ仅仅是当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。

channel.BasicQos(0, 1, false);

注意队列大小

如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。

完整的代码

NewTask.cs 类:

using System;
using RabbitMQ.Client;
using System.Text;

class NewTask
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);

var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
Console.WriteLine(" [x] Sent {0}", message);
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}

private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }
}

Worker.cs类:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;

class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

Console.WriteLine(" [*] Waiting for messages.");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);

int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000);

Console.WriteLine(" [x] Done");

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
noAck: false,
consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: