您的位置:首页 > 其它

多线程消息队列框架的设计(很好很实用的设计思想)

2014-05-04 23:44 751 查看


多线程消息队列有广泛的应用场合。任何程序都是为了完成某个业务活动。有时候业务活动的发起者只知道要完成某个业务活动,而不知道或者不关心这个业务活动何时完成以及如何完成;有时候业务活动的实际处理者只知道如何处理自己熟知的业务活动,而不关心该业务活动何时发起。消息队列为这种应用提供了一种比较好的解决方案。

外部接口

多线程消息队列框架的使用者关注以下几件事情:

如何发起一个业务活动,即如何发送一个消息?

如何将自己的业务处理程序挂接到消息队列框架中,以便适时的处理业务活动?

消息管理中心(MessageCenter)

Interface IBMessageCenter

{

HRESULT RegisterMessageDispatcher([in]IBMessageDispatcher * dispatcher);

HRESULT RegisterMessageWorker([in]IBMessageWorker * worker);

HRESULT PostMessage([in]BosMessage * msg);

HRESULT Start();

HRESULT Stop();

};

RegisterMessageDispatcher方法注册消息派发器

RegisterMessageWorker方法负责注册消息接收器

PostMessage方法发送一个消息

Start方法启动消息队列系统

Stop方法停止消息队列系统

消息派发器(MessageDispatcher)

Interface IBMessageDispatcher

{

HRESULT PostMessage([in]BosMessage * msg);

HRESULT RegisterMessage([in]long msgid);

HRESULT RegisterMessageWorker([in]long msgid, [in]IBMessageWorker * worker);

HRESULT RegisterMessageHandler([in]long msgid, [in]IBMessageHandler *handler);

HRESULT MessageWorkerSelector([in] IBMessageWorkerSelector * newVal);

};

PostMessage方法负责派发一个消息。

RegisterMessage方法注册一个消息,未注册的消息不会派发,因为也不知道往哪里派发。

RegisterMessageWorker方法为指定的消息注册一个消息接收器,可为同一个消息注册多个消息接收器,原因后面再讲。

RegisterMessageHandler方法为指定的消息注册一个消息处理器,一个消息注册了多个消息处理器时,只保留最后一个。

MessageWorkerSelector方法负责指定消息接收器的选择方式。因为一个消息可以有多个消息接收器,那么可能有必要为一个具体的消息选择最合适的消息接收器。

消息接收器(MessageWorker)

interface IBMessageWorker

{

HRESULT PostMessage([in]BosMessage * msg);

HRESULT Start();

HRESULT Stop();

};

PostMessage方法负责将消息投递到消息队列中。

Start方法负责启动消息循环工作。

Stop方法负责停止消息循环工作。

消息处理器(MessageHandler)

interface IBMessageHandler

{

HRESULT ProcessMessage([in]BosMessage * msg);

};

ProcessMessage方法负责处理具体的消息。

内部实现

消息队列框架内部由以下几个部分组成。

消息管理中心(MessageCenter)

消息管理中心提供启动和停止消息队列系统的方法以及发送消息的方法。消息管理中心还负责注册消息派发器和消息接收器。

消息派发器(MessageDispatcher)

消息派发器负责将为每一个消息指定其消息处理器,然后将消息派发到合适的消息接收器。

消息接收器(MessageWorker)

消息接收器负责将接收到的消息存储到消息队列中,并且调用合适的消息处理器来处理消息队列中的消息。

消息接收器包含一个消息队列,并有一个专门工作线程负责读取队列中的消息。

消息队列需要一个互斥量以保证数据的一致性。这是这个多线程消息队列框架在运行过程中唯一需要同步的地方。

消息处理器(MessageHandler)

消息处理器负责处理具体的消息。用户需要根据消息处理器的接口来实现自定义的消息处理器,以便将消息系统与用户的实际业务关联起来。

设计总结

这里解释一下为什么同一个消息可以注册多个消息接收器,以及为什么提出消息接收器的选择器的概念。

降低消息处理器之间的相互影响

消息最终会在哪里被处理呢?消息最终会在消息接收器的工作线程中被处理。

同一个消息,指的是消息ID相同的消息,派发到哪一个消息接收器,是灵活的,因为同一个消息可能被派发到不同的消息接收器,不同的消息可能被派发到同一个消息接收器; 又是有原则,因为同一个消息只可能被派发到RegisterMessageWorker方法为该消息指定的若干消息接收器。 这样,如果这个消息的处理很费时或者消息个数非常多,那么可以为其多指定几个不同的消息接收器,从而可以提高这个消息的并行处理程度,如果这个消息的处理非常迅速或者消息个数很少,那么甚至可以将其与其它消息共享同一个消息接收器。这样既保证了消息处理的最大并行度,降低了消息处理器之间的相互影响,又不浪费任何消息接收器的处理能力。

对象临界资源的无锁访问

这一点其实是这项设计提供的附产品。

消息的处理最终会引起某一个目标对象的部分状态也就是它的成员数据的变化。在多线程环境下,这部分状态需要提供互斥访问的机制,比如使用互斥量(MUTEX)。如果这样的对象的数量很少,那么使用互斥量也无妨。但是如果对象的数量巨大,每个对象都使用一个甚至多个互斥量,那么不但效率低下而且创建太多系统核心对象也不太好。

每一个消息的处理总是跟特定的一部分状态的变化相关联的。如果我们能够保证每一个跟同一个具体对象关联的消息,都在同一个消息接收器中被处理,也就是在同一个线程中被处理,那么这个对象的相应的状态就不需要互斥访问机制的保护了,也就是实现了临界资源的无锁访问。

比如发往对象OBJECT1的消息ID为1000的消息,处理时会改变OBJECT1的m_status状态,那么如果这个消息在某个固定的线程中被处理,那么m_status状态就不需要互斥量(MUTEX)的保护。

这个消息队列框架中,消息接收器的选择器就可根据需要确保发往每个对象的同一个消息,都在同一个消息接收器中被处理,只有简单的一个表达式:

worker_index = OBJECT_ID mod WORKER_COUNT_OF_MSGID;

这里OBJECT_ID就是消息的目标对象的对象ID, WORKER_COUNT_OF_MSGID是为这个消息注册的消息接收器的数量, mod是求余运算, Worker_index就是选择的消息接收器的编号。

当然,还可以根据实际需要,设计其它的消息接收器的选择器。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: