NServiceBus 结合 RabbitMQ 使用
2016-11-30 17:09
429 查看
参考官方教程:
Step by Step Guide
新建4个项目:
A Console Application named
A Console Application named
A Console Application named
A Class Library named
Framework框架选择4.6及以上,后面有用到。
Client,Server,Subscriber引用Shared。
4个项目都安装NServiceBus包:
3个控制台项目安装NServiceBus.RabbitMQ包:
Share代码:
Client代码:
Server代码:
为什么要选4.6以上,原因就在Task.CompletedTask需要4.6以上。
SubScribe代码:
选择多启动项目:
启动项目,在Client端按回车,可以看到Server端和Subscribe端的接收信息:
同时查看http://10.255.20.44:15672/#/queues:
Step by Step Guide
新建4个项目:
A Console Application named
Client
A Console Application named
Server
A Console Application named
Subscriber
A Class Library named
Shared
Framework框架选择4.6及以上,后面有用到。
Client,Server,Subscriber引用Shared。
4个项目都安装NServiceBus包:
Install-Package NServiceBus
3个控制台项目安装NServiceBus.RabbitMQ包:
Install-Package NServiceBus.RabbitMQ
Share代码:
using NServiceBus;
public class PlaceOrder:ICommand { public Guid Id { get; set; } public string Product { get; set; } }
public class OrderPlaced:IEvent { public Guid OrderId { get; set; } }
public class PlaceShipping:ICommand { public Guid Id { get; set; } public string Product { get; set; } }
Client代码:
namespace Client { class Program { static void Main(string[] args) { AsyncMain().GetAwaiter().GetResult(); } static async Task AsyncMain() { Console.Title = "Sample.StepByStep.Client"; var endpointConfiguration = new EndpointConfiguration(endpointName: "Sample.StepByStep.Client"); endpointConfiguration.SendFailedMessagesTo("error"); var transport = endpointConfiguration.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=10.255.20.44;username=guest;password=guest"); endpointConfiguration.UseSerialization<JsonSerializer>(); endpointConfiguration.EnableInstallers(); endpointConfiguration.UsePersistence<InMemoryPersistence>(); var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false); try { await SendOrder(endpointInstance); } catch (Exception) { await endpointInstance.Stop().ConfigureAwait(false); } } private static async Task SendOrder(IEndpointInstance endpointInstance) { Console.WriteLine("Press enter to send a message"); Console.WriteLine("Press any key to exit"); while(true) { var key = Console.ReadKey(); Console.WriteLine(); if(key.Key!=ConsoleKey.Enter) { return; } var id = Guid.NewGuid(); var id2 = Guid.NewGuid(); var placeOrder = new PlaceOrder { Product = "New shoes", Id = id }; var placeShipping = new PlaceShipping { Product = "A-->B", Id = id2 }; await endpointInstance.Send("Samples.StepByStep.Server", placeOrder); await endpointInstance.Send("Samples.StepByStep.Server", placeShipping); Console.WriteLine($"Sent a PlaceOrder messge with id:{id:N}"); Console.WriteLine($"Sent a PlaceShipping messge with id:{id2:N}"); } } } }
Server代码:
namespace Server { class Program { static void Main(string[] args) { AsyncMain().GetAwaiter().GetResult(); } static async Task AsyncMain() { Console.Title = "Samples.StepByStep.Server"; var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Server"); endpointConfiguration.UseSerialization<JsonSerializer>(); endpointConfiguration.EnableInstallers(); var transport = endpointConfiguration.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=10.255.20.44;username=guest;password=guest"); endpointConfiguration.UsePersistence<InMemoryPersistence>(); endpointConfiguration.SendFailedMessagesTo("error"); var endpointInstance = await Endpoint.Start(endpointConfiguration) .ConfigureAwait(false); try { Console.WriteLine("Press any key to exit"); Console.ReadKey(); } finally { await endpointInstance.Stop() .ConfigureAwait(false); } } } }
namespace Server { public class PlaceOrderHandler : IHandleMessages<PlaceOrder> { static ILog log = LogManager.GetLogger<PlaceOrderHandler>(); public Task Handle(PlaceOrder message, IMessageHandlerContext context) { log.Info($"Order for Product:{message.Product} placed with id: {message.Id}"); log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}"); var orderPlaced = new OrderPlaced { OrderId = message.Id }; return context.Publish(orderPlaced); } } }
namespace Server { public class PlaceShippingHandler : IHandleMessages<PlaceShipping> { static ILog log = LogManager.GetLogger<PlaceShippingHandler>(); public Task Handle(PlaceShipping message, IMessageHandlerContext context) { log.Info($"Shipping for Product:{message.Product} placed with id: {message.Id}"); return Task.CompletedTask; } } }
为什么要选4.6以上,原因就在Task.CompletedTask需要4.6以上。
SubScribe代码:
namespace Subscriber { class Program { static void Main(string[] args) { AsyncMain().GetAwaiter().GetResult(); } static async Task AsyncMain() { Console.Title = "Samples.StepByStep.Subscriber"; var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Subscriber"); endpointConfiguration.UseSerialization<JsonSerializer>(); endpointConfiguration.EnableInstallers(); var transport = endpointConfiguration.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=10.255.20.44;username=guest;password=guest"); endpointConfiguration.UsePersistence<InMemoryPersistence>(); endpointConfiguration.SendFailedMessagesTo("error"); var endpointInstance = await Endpoint.Start(endpointConfiguration) .ConfigureAwait(false); try { Console.WriteLine("Press any key to exit"); Console.ReadKey(); } finally { await endpointInstance.Stop() .ConfigureAwait(false); } } } }
namespace Subscriber { public class OrderCreatedHandler : IHandleMessages<OrderPlaced> { static ILog log = LogManager.GetLogger<OrderCreatedHandler>(); public Task Handle(OrderPlaced message, IMessageHandlerContext context) { log.Info($"Handling: OrderPlaced for Order Id: {message.OrderId}"); return Task.CompletedTask; } } }
选择多启动项目:
启动项目,在Client端按回车,可以看到Server端和Subscribe端的接收信息:
同时查看http://10.255.20.44:15672/#/queues:
相关文章推荐
- 使用Service Bus Explorer 工具来管理和测试Topics、 Queues 和 Relay Services
- Entity Framework4.0 (九EF4与WCF Data Service的结合使用
- Service与Notification的结合使用实现文件下载
- spring cloud搭建微服务second-fiberhome(三):consul结合config以及bus实现配置文件通过RabbitMQ动态调用
- APK下载并跳转安装--DownloadManager、IntentService、BroadcastReceiver的结合使用
- 使用Service Bus Explorer 工具来管理和测试Topics、 Queues 和 Relay Services
- RabbitMQ与spring结合之spring-rabbit使用(三)
- service结合MediaPlayer的基础使用
- 使用Service Bus + SignalR 实现聊天室
- rabbitMQ结合spring框架使用
- Entity Framework4.0 (九EF4与WCF Data Service的结合使用(转)
- 使用spring-amqp结合使用rabbitmq
- Android -- Activity 、Service、BroadcastReceiver 结合使用
- 使用service方式启动rabbitmq-server报错erlexec: HOME must be set
- Windows Azure Service Bus (5) 主题(Topic) 使用VS2013开发Service Bus Topic
- 使用Service Bus Explorer 工具来管理和测试Topics、 Queues 和 Relay Services
- broadcast 和 service的简单结合使用
- WebSocket使用SuperWebSocket结合WindowsService实现实时消息
- Android中Service的启动方式的区别和结合使用
- 线程池ExecutorService结合ArrayBlockingQueue的使用