如何在ASP.NET Core中使用Azure Service Bus Queue
原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
作者:damienbod
译文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:https://www.cnblogs.com/lwqlun/p/10760227.html
作者:Lamond Lu
源代码: https://github.com/lamondlu/AzureServiceBusMessaging
本文展示了如何使用Azure Service Bus Queue, 实现2个ASP.NET Core Api应用之间的消息传输。
配置Azure Service Bus Queue
你可以从官网文档中了解到如何配置一个Azure Service Bus Queue.
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal
这里我们使用Queue或者Topic来实现消息传输。Queue是一种消息传输类型,一旦一个消息被一个消费者接收了,该消息就会从Queue中被移除。
与Queue不同,Topic提供的是一对多的通讯方式。
架构图
整个应用的实现如下:
- Api 1负责发送消息
- Api 2负责监听Azure Service Bus,并处理接收到的消息
实现一个Service Bus Queue
这里我们首先需要引入** Microsoft.Azure.ServiceBus ** 程序集。Microsoft.Azure.ServiceBus是Azure Service Bus的客户端库。针对Service Bus的连接字符串我们保存在项目的User Secret中。当部署项目的时候,我们可以使用Azure Key Valut来设置这个Secret值。
在Visual Studio中,右键点击API1, API2项目属性,选择Manage User Secrets就可以管理当前项目使用的所有私密信息。
为了发送向Azure Service Bus Queue发送消息,我们需要创建一个
SendMessage方法,并接收一个消息参数。这里我们创建了一个我们自己的消息内容类型
MyPayload, 将当前该
MyPayload对象序列化成Json字符串, 添加到一个
Message对象中。
using Microsoft.Azure.ServiceBus; using Microsoft.Extensions.Configuration; using Newtonsoft.Json; using System.Text; using System.Threading.Tasks; namespace ServiceBusMessaging { public class ServiceBusSender { private readonly QueueClient _queueClient; private readonly IConfiguration _configuration; private const string QUEUE_NAME = "simplequeue"; public ServiceBusSender(IConfiguration configuration) { _configuration = configuration; _queueClient = new QueueClient( _configuration .GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME); } public async Task SendMessage(MyPayload payload) { string data = JsonConvert.SerializeObject(payload); Message message = new Message(Encoding.UTF8.GetBytes(data)); await _queueClient.SendAsync(message); } } }
在API 1和API 2中,我们需要将
ServiceBusSender注册到应用程序的IOC容器中。这里为了测试方便,我们同时注册
Swagger服务。
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); services.AddScoped<ServiceBusSender>(); services.AddSwaggerGen(c => { c.SwaggerDoc("v1", new Info { Version = "v1", Title = "Payload View API", }); }); }
接下来,我们就可以在控制器中通过构造函数注入的方式使用这个服务了。
在API1中,我们创建一个POST方法,这个方法会将API接收到
Payload对象发送到Azure Service Bus Queue中。
[HttpPost] [ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)] [ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)] public async Task<IActionResult> Create([FromBody][Required]Payload request) { if (data.Any(d => d.Id == request.Id)) { return Conflict($"data with id {request.Id} already exists"); } data.Add(request); // Send this to the bus for the other services await _serviceBusSender.SendMessage(new MyPayload { Goals = request.Goals, Name = request.Name, Delete = false }); return Ok(request); }
从Queue中获取消息
为了监听Azure Service Bus Queue, 并处理接收到的消息,我们创建了一个新类
ServiceBusConsumer,
ServiceBusConsumer实现了
IServiceBusConsumer接口。
Queue的连接字符串是使用
IConfiguration读取的。
RegisterOnMessageHandlerAndReceiveMessages方法负责注册消息处理程序
ProcessMessagesAsync处理消息。
ProcessMessagesAsync方法会将得到的消息转换成对象,并调用
IProcessData接口完成最终的消息处理。
using Microsoft.Azure.ServiceBus; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ServiceBusMessaging { public interface IServiceBusConsumer { void RegisterOnMessageHandlerAndReceiveMessages(); Task CloseQueueAsync(); } public class ServiceBusConsumer : IServiceBusConsumer { private readonly IProcessData _processData; private readonly IConfiguration _configuration; private readonly QueueClient _queueClient; private const string QUEUE_NAME = "simplequeue"; private readonly ILogger _logger; public ServiceBusConsumer(IProcessData processData, IConfiguration configuration, ILogger<ServiceBusConsumer> logger) { _processData = processData; _configuration = configuration; _logger = logger; _queueClient = new QueueClient( _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME); } public void RegisterOnMessageHandlerAndReceiveMessages() { var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 1, AutoComplete = false }; _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions); } private async Task ProcessMessagesAsync(Message message, CancellationToken token) { var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body)); _processData.Process(myPayload); await _queueClient.CompleteAsync(message.SystemProperties.LockToken); } private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception"); var context = exceptionReceivedEventArgs.ExceptionReceivedContext; _logger.LogDebug($"- Endpoint: {context.Endpoint}"); _logger.LogDebug($"- Entity Path: {context.EntityPath}"); _logger.LogDebug($"- Executing Action: {context.Action}"); return Task.CompletedTask; } public async Task CloseQueueAsync() { await _queueClient.CloseAsync(); } } }
其中
IProcessData接口存在于类库项目
ServiceBusMessaging中,它是用来处理消息的。
public interface IProcessData { void Process(MyPayload myPayload); }
在Api 2中,我们创建一个
ProcessData类,它实现了
IProcessData接口。
public class ProcessData : IProcessData { public void Process(MyPayload myPayload) { DataServiceSimi.Data.Add(new Payload { Name = myPayload.Name, Goals = myPayload.Goals }); } }
这里为了简单测试,我们创建了一个静态类
DataServiceSimi,其中存放了API2中所有保存Payload对象。同时,我们还创建了一个新的控制器ViewPayloadMessagesController,在其中添加了一个GET Action,并返回了静态类DataServiceSimi中的所有数据。
[Route("api/[controller]")] [ApiController] public class ViewPayloadMessagesController : ControllerBase { [HttpGet] [ProducesResponseType(StatusCodes.Status200OK)] public ActionResult<List<Payload>> Get() { return Ok(DataServiceSimi.Data); } }
最后我们还需要将
ProcessData注册到API2的IOC容器中。
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>(); services.AddTransient<IProcessData, ProcessData>(); }
最终效果
现在我们分别启用2个Api项目,并在Api 1的Swagger文档界面,调用POST请求,添加一个Payload
操作完成之后,我们访问Api 2的/api/ViewPayloadMessages, 获得结果如下,Api 1发出的消息出现在了Api 2的结果集中,这说明Api 2从Azure Service Bus Queue中获取了消息,并保存在了自己的静态类
DataServiceSimi中。
- [翻译] 如何在 ASP.Net Core 中使用 Consul 来存储配置
- asp.net core项目中如何使用html文件
- ASP.NET Core中如何使用表达式树创建URL详解
- 如何在ASP.NET Core中使用Redis
- 如何使用Rotativa在ASP.NET Core MVC中创建PDF详解
- ASP.NET Core中的缓存[1]:如何在一个ASP.NET Core应用中使用缓存
- ASP.NET Core中如何针对一个使用HttpClient对象的类编写单元测试
- 详解如何在ASP.NET Core中使用Redis
- 如何使用ASP.NET Core实现搜索功能
- 如何在ASP.NET Core Web API测试中使用Postman
- ASP.NET Core 中文文档 第四章 MVC(3.4)如何使用表单
- ASP.NET Core如何使用Entity Framework
- 如何在ASP.NET Core中使用Redis
- 如何在ASP.NET Core 2.0中使用Razor页面
- 如何在ASP.NET Core中使用JSON Patch
- 项目开发中的一些注意事项以及技巧总结 基于Repository模式设计项目架构—你可以参考的项目架构设计 Asp.Net Core中使用RSA加密 EF Core中的多对多映射如何实现? asp.net core下的如何给网站做安全设置 获取服务端https证书 Js异常捕获
- ASP.NET Core中如何针对一个使用HttpClient对象的类编写单元测试
- [dotnetCore2.0]学习笔记之二: ASP.NET Core中,如何灵活使用静态文件和加载自定义配置
- Asp.net Core中如何使用中间件来管理websocket
- Asp.Net 如何在Server端如何使用非系统默认安装字体?