RabbitMQ 官方NET教程(四)【路由选择】
2017-06-02 14:21
405 查看
在上一个教程中,我们构建了一个简单的日志记录系统。 我们能够广播日志消息给所有你的接收者。
在本教程中,我们将为其添加一个功能 - 我们将让日志接收者可以仅订阅一部分消息。 例如,我们将能够仅将关键的错误消息写入到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定表示转发器与队列之间的关系。我们也可以简单的认为:队列对该转发器上的消息感兴趣。
绑定可以附带一个额外的参数routingKey。 为了避免与
绑定键的意义依赖于转发器的类型。对于fanout类型,忽略此参数。
我们正在使用一个
我们会使用
为了说明,请考虑以下设置:
在这个设置中,我们可以看到
在这样的设置中,发布附带一个选择键(routing key)
使用相同的绑定键绑定多个队列是完全合法的。 在我们的示例中,我们可以在
一如以往,我们需要先创建一个转发器:
然后我们准备发送一条消息:
为了简化代码,我们假定
EmitLogDirect.cs 类的代码:
ReceiveLogsDirect.cs的代码:
如果您只想将
如果您想查看屏幕上的所有日志消息,请打开一个新终端,然后执行以下操作:
而且,例如,要发出
在本教程中,我们将为其添加一个功能 - 我们将让日志接收者可以仅订阅一部分消息。 例如,我们将能够仅将关键的错误消息写入到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定(Bindings)
在以前的例子中,我们已经使用过绑定。类似下面的代码:channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
绑定表示转发器与队列之间的关系。我们也可以简单的认为:队列对该转发器上的消息感兴趣。
绑定可以附带一个额外的参数routingKey。 为了避免与
BasicPublish参数混淆,我们将其称为
binding key。 这就是我们如何用一个键创建一个绑定:
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "black");
绑定键的意义依赖于转发器的类型。对于fanout类型,忽略此参数。
直接转发(Direct exchange)
我们从上一个教程的日志记录系统向所有消费者广播所有消息。 我们希望将其扩展为允许基于其严重性进行过滤日志消息。 例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。我们正在使用一个
fanout的交换机,它不给我们很大的灵活性 - 它只能无意识地转发。
我们会使用
direct转发器。
direct类型的转发器背后的路由算法很简单 - 消息传递到
binding key与消息的
routing key完全匹配的队列。
为了说明,请考虑以下设置:
在这个设置中,我们可以看到
direct类型的转发器
X与两个队列绑定。 第一个队列与绑定键
orange绑定,第二个队列与转发器间有两个绑定,一个与绑定键
black绑定,另一个与
green绑定键绑定。
在这样的设置中,发布附带一个选择键(routing key)
orange的消息至交换机,将被导向到队列Q1。 消息附带一个选择键 (routing key)
black或者
green将会被导向到Q2。 所有其他消息将被丢弃。
多重绑定(multiple bindings)
使用相同的绑定键绑定多个队列是完全合法的。 在我们的示例中,我们可以在
X和
Q1之间添加绑定键
black。 在这种情况下,
direct交换将表现得像
fanout,并将消息广播到所有匹配的队列。 附带选择键
black的消息将传送到Q1和Q2。
发送日志(Emittinglogs)
我们将此模型用于日志记录系统。我们将消息发送到direct类型的转发器而不是
fanout类型。这样的话, 接收程序可以根据严重性来选择接收。 我们首先关注发送日志的代码:
一如以往,我们需要先创建一个转发器:
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
然后我们准备发送一条消息:
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body);
为了简化代码,我们假定
severity是
info,
warning,
error中的一个。
订阅
接收消息将像上一个教程类似,只有一点不同 - 我们将为每个我们感兴趣的严重性类型的日志创建一个新的绑定。var queueName = channel.QueueDeclare().QueueName; foreach(var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); }
完整的实例
EmitLogDirect.cs 类的代码:
using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;
class EmitLogDirect
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs",
type: "direct");
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1)
? string.Join(" ", args.Skip( 1 ).ToArray())
: "Hello World!";
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
ReceiveLogsDirect.cs的代码:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class ReceiveLogsDirect { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var queueName = channel.QueueDeclare().QueueName; if(args.Length < 1) { Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach(var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); } Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
如果您只想将
warning和
error(而不是
info)保存到文件中,只需打开控制台并键入:
cd ReceiveLogsDirect dotnet run warning error > logs_from_rabbit.log
如果您想查看屏幕上的所有日志消息,请打开一个新终端,然后执行以下操作:
cd ReceiveLogsDirect dotnet run info warning error # => [*] Waiting for logs. To exit press CTRL+C
而且,例如,要发出
error日志消息,只需键入:
cd EmitLogDirect dotnet run error "Run. Run. Or it will explode." # => [x] Sent 'error':'Run. Run. Or it will explode.'
相关文章推荐
- RabbitMQ 官方NET教程(四)【路由选择】
- RabbitMQ 官方NET教程(五)【Topic】
- RabbitMQ 官方NET教程(二)【工作队列】
- RabbitMQ 官方NET教程(六)【RPC】
- RabbitMQ 官方NET教程(三)【发布/订阅】
- RabbitMQ 官方NET教程(二)【工作队列】
- RabbitMQ 官方NET教程(六)【RPC】
- RabbitMQ 官方NET教程(五)【Topic】
- RabbitMQ 官方NET教程(一)【介绍】
- RabbitMQ 官方NET教程(一)【介绍】
- RabbitMQ 官方NET教程(三)【发布/订阅】
- Firefly官方教程之Netconnect使用文档
- Firefly官方教程之Netconnect使用文档
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ官方中文入门教程(PHP版) 第一部分:Hello World
- RabbitMQ官方中文入门教程(PHP版) 第一部分:Hello World
- PHP版 RabbitMQ官方中文入门教程
- ASP.NET AJAX服务器端控件官方视频教程 下载--英文
- RabbitMQ实例教程:路由选择
- ASP.NET MVC 官方教程