您的位置:首页 > 其它

使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性

2018-08-17 19:06 621 查看

 

public void DoStart()
{
// 1:从消息队列中取得需要处理的音频消息
Consumer consumer = new Consumer(MqConfig.MeidaQueueName);
var channel = consumer.Channel;
consumer.ReceivedEvent += (sender, args) =>
{
var msg = Encoding.UTF8.GetString(args.Body);
Console.WriteLine(args.RoutingKey + "\r\n" + msg);
Console.WriteLine();

// 2:执行同步处理(一次只调用一个同步处理单元)
var nonObj = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg);
var nonBoy = JsonConvert.DeserializeObject<dynamic>(nonObj["Body"].ToString());
string forntFileUrl = nonBoy.frontFileUrl;
int backgounedAudioIndex = nonBoy.backgounedAudioIndex;
string taskName = nonBoy.taskName;
// 调用同步方法
var r = SynthesisAudio(forntFileUrl, backgounedAudioIndex, taskName);
Console.WriteLine(r.GetType());
Console.WriteLine(typeof(AudioSynthesisSyncResult));
if (r.GetType() == typeof(AudioSynthesisSyncResult))
{
// 3:处理完成,应答队列服务器
channel.BasicAck(args.DeliveryTag, false);
Console.WriteLine(taskName);
Console.WriteLine("handler done, wait for the next message...");
}
else
{
// 出现处理错误,则该条消息不做应答,并发送错误
var error = ((JsonResult) r);
Console.WriteLine(error.StatusCode);
Console.WriteLine(error.Value);
}
};
}
View Code 当任务进入到消息队列,其实就和当时的请求是没有任何联系的了,这样来理解异步也不错,所以我们需要将任务的状态进行分类存储,以告诉客户端在查询的时候,当前的任务进行到哪一步了,我们可以用枚举的方式来罗列:

public enum AudioProcessingState
{
EmptyHandler = 0,
StartHandler = 1,
DownloadAudio = 2,
SynthesisAudio = 3,
UploadAudio = 4,
UpdateDatabase = 5,
HandlerException = 6,
InCompleted = 7
}

 

 笔者提供的任务状态有8种,具体时候请根据自己的业务逻辑进行区分,很简单,就是前面画的那张垂直流程图,不解释。

当然,如果你把所有任务状态都存到数据库,那么将会有个问题,这数据库面对轮询的压力有点吃力,所以最好还是放到缓存中,至于喜欢放什么缓存,这个根据业务场景和现有的而定,千万别放本地缓存就行。

对了,状态放缓存,而结果需要放数据库,这是原则问题。

 

客户端轮询结果接口

接下来我们在创建一个提供查询的接口,这里实际就是查询缓存而已,如果状态是InCompleted,就直接从数据库取结果,因为非常的简单,笔者就不放代码上来了。

不过有朋友喜欢将结果进行推送到客户端,这也是非常好的,而且相比轮询,推送更能减少服务器压力。

 

测试结果

为了验证结果,笔者前前后后进行了多次的测试,在I7-2700K的WIN10上面模拟了多台服务器,看看这截图:

能分离的全都分离,包括请求和查询也单列一台服务器。

经过测试,笔者通过模拟请求8个任务,采用逐级增加服务的方式,得到了如下的结果:

单机  最快(最早入队)/ms
 最慢(最晚入队)/ms
第一次  3241  19430
第二次  3271  19592
第三次  4564  19227
两台    
第一次  4058  9819
第二次  3146  9014
第三次  4033  8798
三台    
第一次  3880  9830
第二次  3477  7700
第三次  3182  6993
六台    
第一次  3709  4800
第二次  3313  4773
第三次  3182  4793

最早入队的任务时间基本锁定在3-4s,为何会有这么大的波动,毕竟笔者的电脑不是真正的服务器电脑。而反观最晚入队的任务,在单机模式上,达到了19s,随着逐级的增加服务(笔者电脑开6个已经吃不消了),达到了不到5s,整体时间缩短了近4倍,结果非常令人满意。

下一节将介绍在NETCORE中如何使用中间件自动启动任务调度,而不是采用quartz中间件。

 

感谢阅读

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐