您的位置:首页 > 其它

用ACE实现生产者-消费者模式

2016-07-21 11:12 344 查看
      ACE_Task是ACE中的任务或主动对象“处理结构”的基类。ACE使用此类来实现主动对象模式。所有希望成为“主动对象”的对象都必须由此类派生。同时可将它看作是更高级的、更为面向对象的线程。

ACE_Task处理的是对象,因此更有利于构造OO程序,产生更好的OO软件,而且,它还包括了一种用于与其他任务通信的易于使用的机制。

ACE_Task可用作:

<1>更高级的线程(常称其为任务)

<2>主动对象模式中的主动对象

ACE任务:

每个任务都含有一或多个线程,以及一个底层消息队列。各个任务通过消息队列进行通信。至于消息队列实现的内在细节程序员不必关注。发送任务用putq() 将消息插入到另一任务的消息队列中,接收任务通过使用getq()将消息提取出来。这样的体系结构大大简化了多线程程序的编程模型。

要实现一个ACE_Task,首先写一个类派生自ACE_Task,通常要实现三个方法。

1.实现服务初始化和终止方法。open(),用于包含所有专属于任务的初始化代码,其中可能包含诸如连接控制块,锁和内存这样的资源。close()是对应的终止方法。

2,调用启动方法,对象实例化后,必须调用activate()启用它。要在主动对象中创建的线程数目,以及一些参数,被传递给activate()方法。activate()方法会使svc()方法成为它生成线程的启动点。

3,svc()方法,各个新线程在svc()方法中启动。开发者在子类定义此方法。

举个例子:

consumer.h

#include "ace/Task.h"
#include "ace/Message_Block.h"
#include "ace/Log_Msg.h"

class Consumer : public ACE_Task<ACE_MT_SYNCH>
{
public:
int open(void *)
{
ACE_DEBUG((LM_DEBUG,"(%t) Producer task opened\n"));
activate(THR_NEW_LWP,1);
return 0;
}
int svc(void)
{
ACE_Message_Block *mb=0;
do
{
mb=0;
getq(mb);
ACE_DEBUG((LM_DEBUG,
"(%t)Got message: %d from remote task\n",*mb->rd_ptr()));

}while(*mb->rd_ptr()<10);
return 0;
}
int close(u_long)
{
ACE_DEBUG((LM_DEBUG,ACE_TEXT("Consumer closes down\n")));
return 0;
}
};
       producter.h

 
#include "ace/Task.h"
#include "ace/Message_Block.h"
#include "ace/Log_Msg.h"
#include "consumer.h"

class Producer:
public ACE_Task<ACE_MT_SYNCH>
{
public:
Producer(Consumer * consumer):
consumer_(consumer), data_(0)
{
mb_=new ACE_Message_Block((char*)&data_,sizeof(data_));
}

int open(void*)
{
ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

//Activate the Task
activate(THR_NEW_LWP,1);
return 0;
}

//The Service Processing routine
int svc(void)
{
while(data_<11)
{
//Send message to consumer
ACE_DEBUG((LM_DEBUG,
"(%t)Sending message: %d to remote task\n",data_));
consumer_->putq(mb_);

//Go to sleep for a sec.
ACE_OS::sleep(1);
data_++;
}
return 0;
}

int close(u_long)
{
ACE_DEBUG((LM_DEBUG,"Producer closes down \n"));
return 0;
}

private:
char data_;
Consumer * consumer_;
ACE_Message_Block * mb_;
};
      main.cpp

#include "consumer.h"
#include "producter.h"

int main(int argc, char * argv[])
{
Consumer *consumer = new Consumer;
Producer *producer = new Producer(consumer);

producer->open(0);
consumer->open(0);
//Wait for all the tasks to exit.
ACE_Thread_Manager::instance()->wait();
}
       说明一下activate函数,其THR_NEW_LWP是创建一个内核级线程。对于没有绑定的线程来说。是添加一个新的内核线程到线程池。

本文参考http://blog.csdn.net/luson_xing/article/details/8508057。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: