使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性
2018-08-17 19:06
621 查看
public void DoStart() { // 1:从消息队列中取得需要处理的音频消息 Consumer consumer = new Consumer(MqConfig.MeidaQueueName); var channel = consumer.Channel; consumer.ReceivedEvent += (sender, args) => { var msg = Encoding.UTF8.GetString(args.Body); Console.WriteLine(args.RoutingKey + "\r\n" + msg); Console.WriteLine(); // 2:执行同步处理(一次只调用一个同步处理单元) var nonObj = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg); var nonBoy = JsonConvert.DeserializeObject<dynamic>(nonObj["Body"].ToString()); string forntFileUrl = nonBoy.frontFileUrl; int backgounedAudioIndex = nonBoy.backgounedAudioIndex; string taskName = nonBoy.taskName; // 调用同步方法 var r = SynthesisAudio(forntFileUrl, backgounedAudioIndex, taskName); Console.WriteLine(r.GetType()); Console.WriteLine(typeof(AudioSynthesisSyncResult)); if (r.GetType() == typeof(AudioSynthesisSyncResult)) { // 3:处理完成,应答队列服务器 channel.BasicAck(args.DeliveryTag, false); Console.WriteLine(taskName); Console.WriteLine("handler done, wait for the next message..."); } else { // 出现处理错误,则该条消息不做应答,并发送错误 var error = ((JsonResult) r); Console.WriteLine(error.StatusCode); Console.WriteLine(error.Value); } }; }View Code 当任务进入到消息队列,其实就和当时的请求是没有任何联系的了,这样来理解异步也不错,所以我们需要将任务的状态进行分类存储,以告诉客户端在查询的时候,当前的任务进行到哪一步了,我们可以用枚举的方式来罗列:
public enum AudioProcessingState { EmptyHandler = 0, StartHandler = 1, DownloadAudio = 2, SynthesisAudio = 3, UploadAudio = 4, UpdateDatabase = 5, HandlerException = 6, InCompleted = 7 }
笔者提供的任务状态有8种,具体时候请根据自己的业务逻辑进行区分,很简单,就是前面画的那张垂直流程图,不解释。
当然,如果你把所有任务状态都存到数据库,那么将会有个问题,这数据库面对轮询的压力有点吃力,所以最好还是放到缓存中,至于喜欢放什么缓存,这个根据业务场景和现有的而定,千万别放本地缓存就行。
对了,状态放缓存,而结果需要放数据库,这是原则问题。
客户端轮询结果接口
接下来我们在创建一个提供查询的接口,这里实际就是查询缓存而已,如果状态是InCompleted,就直接从数据库取结果,因为非常的简单,笔者就不放代码上来了。
不过有朋友喜欢将结果进行推送到客户端,这也是非常好的,而且相比轮询,推送更能减少服务器压力。
测试结果
为了验证结果,笔者前前后后进行了多次的测试,在I7-2700K的WIN10上面模拟了多台服务器,看看这截图:
能分离的全都分离,包括请求和查询也单列一台服务器。
经过测试,笔者通过模拟请求8个任务,采用逐级增加服务的方式,得到了如下的结果:
单机 | 最快(最早入队)/ms |
最慢(最晚入队)/ms |
第一次 | 3241 | 19430 |
第二次 | 3271 | 19592 |
第三次 | 4564 | 19227 |
两台 | ||
第一次 | 4058 | 9819 |
第二次 | 3146 | 9014 |
第三次 | 4033 | 8798 |
三台 | ||
第一次 | 3880 | 9830 |
第二次 | 3477 | 7700 |
第三次 | 3182 | 6993 |
六台 | ||
第一次 | 3709 | 4800 |
第二次 | 3313 | 4773 |
第三次 | 3182 | 4793 |
最早入队的任务时间基本锁定在3-4s,为何会有这么大的波动,毕竟笔者的电脑不是真正的服务器电脑。而反观最晚入队的任务,在单机模式上,达到了19s,随着逐级的增加服务(笔者电脑开6个已经吃不消了),达到了不到5s,整体时间缩短了近4倍,结果非常令人满意。
下一节将介绍在NETCORE中如何使用中间件自动启动任务调度,而不是采用quartz中间件。
感谢阅读
相关文章推荐
- 使用.NET Core搭建分布式音频效果处理服务(七)使用Docker压榨性能极限
- 使用.NET Core搭建分布式音频效果处理服务(六)让Middleware自动Invoke
- 使用.NET Core搭建RibbitMQ分布式音频效果处理服务(目录)
- C#编写Windows服务程序 (服务端),client使用 消息队列 实现淘宝 订单全链路效果
- C#编写Windows服务程序 (服务端),客户端使用 消息队列 实现淘宝 订单全链路效果
- PHP中利用redis实现消息队列处理高并发请求--简洁代码实现效果
- 使用消息队列+js实现分布式服务器热切换业务处理功能
- DIOCP开源项目-利用队列+0MQ+多进程逻辑处理,搭建稳定,高效,分布式的服务端
- 中小网站的福音:如何使用infBox消息服务提升竞争力
- 分布式消息队列的设计和使用
- 【阿里云产品公测】消息队列服务MQS使用分享
- MSMQ?不,太弱了。使用ActiveMQ实现消息队列服务
- 【转】NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例
- NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例
- 【httpsqs】轻量级消息队列处理安装与使用
- 使用NODEJS+REDIS开发一个消息队列以及定时任务处理
- C#中使用消息队列服务
- 如何使用WCF服务实现分布式处理数据?
- 7月目标 socket , 一致性哈希算法 ; mongodb分片; 分布式消息队列; 中间件的使用场景
- 针对高并发,可扩展的互联网架构,搭建消息队列(一)