您的位置:首页 > 其它

消息队列在项目中的使用总结

2017-11-16 21:42 701 查看
一、传统通信模式的不足

常用的传统进程通信模式一般是client调用server的服务,等待server的响应。但是在网络情况不好或者在server需要较长的处理时间的时候,就可能导致client的调用失败或超时。业务场景中经常会有一些非常耗时的操作容易阻塞通信,就需要选择独立、耦合性低的消息中间件来完成业务系统间的交互和数据传递。

二、消息队列的优势

消息队列可以作为通信的中介,临时存放发送方信息,等待接收方领取。消息的发送者将消息放进消息队列后可以立即返回,不需要等待接收者的响应,消息会被保存在队列中,直到被接收者取出。消息队列的以下几个优点:

1、屏蔽异构平台的细节:发送方、接收方系统之间不需要了解双方,只需认识消息。

2、异步:消息堆积能力;发送方接收方不需同时在线,发送方接收方不需同时扩容(削峰)。

3、解耦:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。

4、复用:一次发送多次消费

5、可靠:一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它;

6、提供路由:发送者无需与接收者建立连接,双方通过消息队列保证消息能够从发送者路由到接收者,甚至对于本来网络不易互通的两个服务,也可以提供消息路由。

 

三、消息队列在项目中的使用

在最近一个批量删除公会的需求里,由于批量删除操作涉及大量耗时操作,很容易造成超时和失败。我们通过使用消息队列,把删除请求和实际删除操作解耦开来, 异步处理。Server只负责把接受到删除的请求放进消息队列和限制用户的请求频率,请求放进MQ后迅速返回成功给前端,并不进行实际的业务删除操作。后台的daemon负责不断从消息队列中获取请求并执行删除操作,操作完成后通过公众号消息推送结果给用户。



四、Hippo消息队列的介绍与使用

1)Producer和Consumer在初始化的时候确定自己的Group和读取配置文件;producer发布topic,然后发送消息到该topic。Consumer先订阅某个topic,然后就可以从该topic取消息。这是消息队列的简化模型。



(Hippo系统实际存在四种角色,分别为生产者,消费者,存储层,中心控制节点,这里简化掉控制节点)

单个收发流程模型:



收发消息的基本模式:

对于一个Topic下的一条消息,属于同一个Group的不同consumer只能有一个可以消费,一个consumer消费后其他consumer就不能再消费了;



而不同Group下的consumer可以重复消费同一条消息,某个Group的consumer消费后不影响其他Group的consumer消费同一条消息。



P2P模式: 

所有的consumer都属于同一个Group;每个消息只能有一个消费者 。所有订阅了该topic的消费者都有资格取,先到先得。



publish/subscribe模式:

  每个consumer有自己的Group, 每个消息可以分发给多个消费者。



2)每个Topic都有n个Queue,topic通过负载均衡的算法以确定每个消费者具体消费哪个队列分区,分发队列的消息给consumer。consumer进行消费的过程中对队列分区是以独占的形式存在的,即一个队列在一个消费组中只能被一个消费者占有并消费,其他没分配队列的消费者只能轮空。为了保证消费的可靠对于每次拉取的数据,都需要consumer端在消费完成之后进行一次确认,否则下次拉取还是从原来的偏移量开始。为了保证可用性,在consumer拉取消息和确认回调这段锁定时间有一个超时机制,超过这个时间队列会自动解锁,被别的消费者消费。也可在配置文件设置为默认处理消息成功,而把处理失败的重试放到业务中实现。目前消息在队列可保存两天。



 3)异步接口和同步接口:producer和consumer都有提供同步接口和异步接口。对于异步模式,应用程序要根据自己业务逻辑写具体的回调处理函数;客户端不缓存消息,发送失败的消息包若业务要重发,则业务程序逻辑实现上要有缓存处理;异步模式下发送和拉取消息立即返回,结果通过回调函数返回。producer的发送接口若采用异步可提高一些性能。

 

五、hippo的申请步骤

1)上tdbank.oa.com登记提单申请,根据业务需要填写业务信息,数据信息选hippo接入,提交申请。

 


2)提交申请后等待相关负责人审核(审核联系gosonzhang,carylu,kennyjiang),拿到测试环境的topic和正式环境的topic与master_addr,就可以进行配置。

六、hippoclient配置文件

 


1)hippo_auto_confirm_in_getmessage作用是减少一次消息交互提高消费性能。为0表示需要主动调用confirm才会认为数据已消费,为1就是消费消息的时候连带确认之前信息已处理,但是最后一次消费请求需要业务手动确认。

2)hippo_consume_from_where有三个参数:-1,0,1。

为-1是指consumer可以接收到保存在所订阅topic的所有消息,不管有没有被消费过;

为0是指consumer可以接收所有还没被所属Group消费过消息;

为1是指consumer只接收订阅之后topic的最新消息。

3)hippo_master_addr是定义Hippo服务器Master地址的列表,该部分需要与Hippo业务负责人联系提供。

 

七、后台daemon的编写

为了让hippo的一些接口支持我们小组项目的协议,我们对hippo做了一些封装,支持了wup协议和jce编码,添加登录态存储,模调和流水。业务层使用过程中只需要调用SendMessage(放消息到MQ)和BlockGetMessage(从MQ拉取消息)两个接口。

收发消息前先要初始化三个公共参数:

conf_file 是hippoclient.conf(hippo的配置文件)的路径;

Group是客户端组名,按照设计生产和消费是由一组客户端完成,这里用于标记每个客户端所属的组;

Topic是消息的主题,一个Topic里有多个queue,一个queue可有多个message。

1)发送消息到MQ的函数:

string sConfFile ="conf/hippoclient.conf";
string sGroup = “gc_league_cond_del”;
string sTopic = “gc_league_cond_del”;

//初始化producer
CGcHippoProducer producer(sConfFile, sGroup); 

STestReq stReq;
SUinSession stSession
//发送消息到MQ
iRet = producer.SendMessage(sTopic, stSession, stReq);
if (iRet != 0)
{
ERROR_LOG("consumer.GetMessage error, iRet:%d, sTopic:%s", iRet, sTopic.c_str());
return iRet;
}
//Session是登录态,stReq放进MQ的消息,支持jce和string类型。调用sendMessage把请求信息stReq放进MQ里。


2)从MQ中拉取消息的函数:

string sConfFile ="conf/hippoclient.conf";
string sGroup = “gc_league_cond_del”;
string sTopic = “gc_league_cond_del”;

//初始化consumer
CGcHippoConsumer consumer(sConfFile, sGroup);
 
string sReq; 
SUinSession stSession;
//阻塞式拉取消息
iRet = consumer.BlockGetMessage(sTopic, stSession, _sReq);
if (iRet != 0)
{
ERROR_LOG("consumer.GetMessage error, iRet:%d, sTopic:%s", iRet, sTopic.c_str());
return iRet;
}
//stSession是登录态,sReq是从MQ里取得的消息,同时支持jce和string类型。调用BlockGetMessage接口阻塞地从MQ中取出登录态和消息。


3)使用进程池封装后的MQ函数:

由于hippo的并发能力受到配置的queue限制,拉取消息的consumer数量不能多于queue数(多余的consumer做无用功)。所以增加一个模型:consumer进程单纯拉取消息,放进共享内存,由进程池从共享内存中获取消息进行逻辑处理。一个consumer进程占用一个进程池,所以不同consumer进程使用的共享内存的配置文件名要不一样!进程池的封装实现:

 


//进程池的逻辑处理函数
int PLineDoJob(const char* pData,int iLen)
{
int iRet = 0;               
   iRet=DoJobWrap<STesteq>(*this, &CTest::DoLogic, pData, iLen);
   return 0;
}
//执行逻辑调用
int DoLogic(STestReq stReq, SUinSession stSession)
{
doSomething();
   return 0;
}
string sConfFile ="conf/hippoclient.conf";
string sGroup = “gc_league_cond_del”;
string sTopic = “gc_league_cond_del”;
//循环拉取信息放进本地内存消息队列
Int AddJobToShmCycle(sConfFile, sGroup, sTopic, iMsgNum)
{
CGcHippoConsumer consumer(sConfFile, sGroup);
while(!gbExitFlag)              
{
//循环拉取消息
TJceReq stReq;
SUinSession stSession;

iRet = consumer.BlockGetMessage(sTopic, stSession, stReq);
if (iRet != 0)
{
ERROR_LOG("consumer.GetMessage error,iRet:%d, sTopic:%s", iRet, sTopic.c_str());
return iRet;
}

//将消息放进本地内存消息队列,当队列满了就睡眠等待
iRet=PLineAddJob((char*)it->getData(),it->getDataLength());
//省略错误处理...
}


详细的原生接口信息请参考:

http://km.oa.com/group/20523/articles/show/230802

八、DNS的配置

现网上线需要配置DNS,参考文章:

http://km.oa.com/group/20523/docs/show/140446

修改配置需要root权限,可自己编写后置脚本使用织云的root用户权限下发到服务器,



配置完成验证ping tl-if-nn-tdw, 能ping通证明配置成功了。

 

九、在织云上打包和部署

1、首先在织云创建 后台server包

 


2、bin目录下上传daemon可执行文件



3、进程的一些配置



监控进程列表的gc_league_cond_del_daemon_new:2,20中,前面是进程名称,2,20是指保持进程数在2~20的范围。如果进程数超出范围,监控脚本会kill掉所有gc_league_cond_del_daemon_new进程,然后重启程序。

kill进程信号为停止进程发出的信号,为保证daemon程序安全退出,可为daemon注册一个信号处理函数,执行完一个操作再退出。

启动方式是启动daemon的脚本,可根据具体需要修改。

4、保存数据,提交版本

后面几个选项根据具体需要修改,



完成之后就可以保存提交了。

 


十、测试报告

1)QPS与consumer个数的关系

条件: topic队列数固定为4个;(理论上支持4个consumer进程同时并发执行) ,consumer单次拉取消息量固定为100;业务处理时间为10ms。

不同进程数下拉取消息和发送消息的QPS数据如下:

 


结论:

consumer数量在和topic的队列数相同的时候consumer拉取消息的qps接近峰值,之后consumer继续增加对qps的增长便不太明显了,因为消费者对队列是独占的,多于队列数的消费者进程都在做无用功。

最佳Consumer数= 队列数。

 

2) QPS与队列数的关系

条件:

consumer个数等于队列数,单次拉取消息量固定为10;



结论:

队列数和consumer数的越大,拉取消息qps越大,但不是线性增长。

 

 

3)QPS与单次拉取消息数的关系

条件:

topic队列数固定为4个;consumer个数固定为4个。



结论:

在进程数不变的情况下,单次拉取消息越多,拉取消息的QPS越大,增长速度变慢,并非线性关系。

 

4)QPS和进程池大小/业务处理时间的关系

条件:

consumer数量设置为4,和队列数一致,consumer每次拉取10条消息,每个consumer占用一个进程池。

下面是使用进程池后不同消费者数下的处理消息的QPS:



结论:

可看出处理消息耗时的瓶颈主要是业务处理总时间和拉取消息总时间,总耗时是业务总时间和拉取消息总时间的最大值,而拉取消息总时间是不可控的,通过调整进程数可减少业务总时间,使得总耗时接近拉取消息总时间,达到最佳,qps也就达到峰值,因此要根据业务处理时间确定进程池进程数。

每个consumer开一个进程池,

进程池进程数=处理时间*单次拉取数/单次拉取时间;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  消息中间件