您的位置:首页 > 其它

NServiceBus 结合 RabbitMQ 使用

2016-11-30 17:09 429 查看
参考官方教程:

Step by Step Guide

新建4个项目:

A Console Application named
Client


A Console Application named
Server


A Console Application named
Subscriber


A Class Library named
Shared


Framework框架选择4.6及以上,后面有用到。

Client,Server,Subscriber引用Shared。

4个项目都安装NServiceBus包:

Install-Package NServiceBus


3个控制台项目安装NServiceBus.RabbitMQ包:

Install-Package NServiceBus.RabbitMQ


Share代码:

using NServiceBus;


public class PlaceOrder:ICommand
{
public Guid Id { get; set; }
public string Product { get; set; }
}


public class OrderPlaced:IEvent
{
public Guid OrderId { get; set; }
}


public class PlaceShipping:ICommand
{
public Guid Id { get; set; }
public string Product { get; set; }
}


Client代码:

namespace Client
{
class Program
{
static void Main(string[] args)
{
AsyncMain().GetAwaiter().GetResult();
}
static async Task AsyncMain()
{
Console.Title = "Sample.StepByStep.Client";
var endpointConfiguration = new EndpointConfiguration(endpointName: "Sample.StepByStep.Client");
endpointConfiguration.SendFailedMessagesTo("error");
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
endpointConfiguration.UseSerialization<JsonSerializer>();
endpointConfiguration.EnableInstallers();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
try
{
await SendOrder(endpointInstance);
}
catch (Exception)
{
await endpointInstance.Stop().ConfigureAwait(false);
}
}

private static async Task SendOrder(IEndpointInstance endpointInstance)
{
Console.WriteLine("Press enter to send a message");
Console.WriteLine("Press any key to exit");
while(true)
{
var key = Console.ReadKey();
Console.WriteLine();
if(key.Key!=ConsoleKey.Enter)
{
return;
}
var id = Guid.NewGuid();
var id2 = Guid.NewGuid();
var placeOrder = new PlaceOrder
{
Product = "New shoes",
Id = id
};
var placeShipping = new PlaceShipping
{
Product = "A-->B",
Id = id2
};
await endpointInstance.Send("Samples.StepByStep.Server", placeOrder);
await endpointInstance.Send("Samples.StepByStep.Server", placeShipping);
Console.WriteLine($"Sent a PlaceOrder messge with id:{id:N}");
Console.WriteLine($"Sent a PlaceShipping messge with id:{id2:N}");
}
}
}
}


Server代码:

namespace Server
{
class Program
{
static void Main(string[] args)
{
AsyncMain().GetAwaiter().GetResult();
}
static async Task AsyncMain()
{
Console.Title = "Samples.StepByStep.Server";
var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Server");
endpointConfiguration.UseSerialization<JsonSerializer>();
endpointConfiguration.EnableInstallers();
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.SendFailedMessagesTo("error");

var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
try
{
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
finally
{
await endpointInstance.Stop()
.ConfigureAwait(false);
}
}
}
}


namespace Server
{
public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
{
static ILog log = LogManager.GetLogger<PlaceOrderHandler>();

public Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");

var orderPlaced = new OrderPlaced
{
OrderId = message.Id
};
return context.Publish(orderPlaced);
}
}
}


namespace Server
{
public class PlaceShippingHandler : IHandleMessages<PlaceShipping>
{
static ILog log = LogManager.GetLogger<PlaceShippingHandler>();

public Task Handle(PlaceShipping message, IMessageHandlerContext context)
{
log.Info($"Shipping for Product:{message.Product} placed with id: {message.Id}");
return Task.CompletedTask;
}
}
}


为什么要选4.6以上,原因就在Task.CompletedTask需要4.6以上。

SubScribe代码:

namespace Subscriber
{
class Program
{
static void Main(string[] args)
{
AsyncMain().GetAwaiter().GetResult();
}
static async Task AsyncMain()
{
Console.Title = "Samples.StepByStep.Subscriber";
var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Subscriber");
endpointConfiguration.UseSerialization<JsonSerializer>();
endpointConfiguration.EnableInstallers();
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.SendFailedMessagesTo("error");

var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
try
{
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
finally
{
await endpointInstance.Stop()
.ConfigureAwait(false);
}
}
}
}


namespace Subscriber
{
public class OrderCreatedHandler : IHandleMessages<OrderPlaced>
{
static ILog log = LogManager.GetLogger<OrderCreatedHandler>();

public Task Handle(OrderPlaced message, IMessageHandlerContext context)
{
log.Info($"Handling: OrderPlaced for Order Id: {message.OrderId}");
return Task.CompletedTask;
}
}
}


选择多启动项目:



启动项目,在Client端按回车,可以看到Server端和Subscribe端的接收信息:



同时查看http://10.255.20.44:15672/#/queues:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: