您的位置:首页 > 编程语言 > C语言/C++

基于ACE的线程池学习与实现(一)——ACE_Task

2013-01-16 09:54 429 查看
这个主要是参考ACE官方文档写的

一、概述

ACE_Task 是ACE 中的任务或主动对象“处理结构”的基类。在ACE 中使用了此类来实现主动对象模式。所有希望成为“主动对象”的对象都必须从此类派生。你也可以把ACE_Task看作是更高级的、更为面向对象的线程类。

ACE_Task处理的是对象,因而在构造OO程序时更便于思考。因此,在大多数情况下,当你需要构建多线程程序时,较好的选择是使用ACE_Task 的子类。这样做有若干好处。首要的是刚刚所提到的,这可以产生更好的OO软件。其次,你不必操心你的线程入口是否是静态的,因为ACE_Task 的入口是一个常规的成员函数。而且,我们会看到ACE_Task 还包括了一种用于与其他任务进行通信的易于使用的机制。

二、任务结构

ACE_Task 的结构在本质上与基于Acto r 的系统中的"Actor“的结构相类似。该结构如下所示:



每个任务都含有一或多个线程,以及一个底层消息队列。各个任务通过这些消息队列进行通信。但是,消息队列并非是程序员需要关注的对象。发送任务可以使用putq() 调用来将消息插入到另一任务的消息队列中。随后接收任务就可以通过使用getq () 调用来从它自己的消息队列里将消息提取出来。

因而,你可以设想一个系统,由多个自治的任务(或主动对象)构成,这些任务通过它们的消息队列相互通信。这样的体系结构有助于大大简化多线程程序的编程模型。

 

三、ACE_task的主要方法

1.实现服务初始化和终止方法:open()方法应该包含所有专属于任务的初始化代码。其中可能包括诸如连接控制块、锁和内存这样的资源。close() 方法是相应的终止方法。

2.调用启用(Activation )方法:在主动对象实例化后,你必须通过调用activate()启用它。要在主动对象中创建的线程的数目,以及其他一些参数,被传递给activate()方法。activate()方法会使svc()方法成为所有它生成的线程的启动点。 

3.实现服务专有的处理方法:如上面所提到的,在主动对象被启用后,各个新线程在svc()方法中启动。应用开发者必须在子类中定义此方法。

 

四、任务间通信

如前面所提到的,ACE 中的每个任务都有一个底层消息队列(参见上面的图示)。这个消息队列被用作任务间通信的一种方法。当一个任务想要与另一任务“谈话”时,它创建一个消息,并将此消息放入它想要与之谈话的任务的消息队列。接收任务通常用getq () 从消息队列里获取消息。如果队列中没有数据可用,它就进入休眠状态。如果有其他任务将消息插入它的队列,它就会苏醒过来,从队列中拾取数据并处理它。因而,在这种情况下,接收任务将从发送任务那里接收消息,并以应用特定的方式作出反馈。

下一个例子演示两个任务怎样使用它们的底层消息队列进行通信。这个例子包含了经典的生产者-消费者问题的实现。生产者任务生成数据,将它发送给消费者任务。消费者任务随后消费这个数据。使用ACE_Task 构造,我们可将生产者和消费者看作是不同的ACE_Task 类型的对象。这两种任务使用底层消息队列进行通信。

这里上具体代码

首先是Consumer.h

/*
* Consumer.h
*
*  Created on: Jan 8, 2013
*      Author: xing
*/

#ifndef CONSUMER_H_
#define CONSUMER_H_

#include "ace/Task.h"
#include "ace/Message_Block.h"

//The Consumer Task.
class Consumer:
public ACE_Task<ACE_MT_SYNCH>
{
public:
int open(void*)
{
ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

//Activate the Task
activate(THR_NEW_LWP,1);

return 0;
}

//The Service Processing routine
int svc(void)
{
//Get ready to receive message from Producer
ACE_Message_Block * mb =0;
do
{
mb=0;

//Get message from underlying queue
getq(mb);
ACE_DEBUG((LM_DEBUG,
"(%t)Got message: %d from remote task\n",*mb->rd_ptr()));
}while(*mb->rd_ptr()<10);

return 0;
}

int close(u_long)
{
ACE_DEBUG((LM_DEBUG,"Consumer closes down \n"));
return 0;
}
};

#endif /* CONSUMER_H_ */


Producter.h

/*
* Producter.h
*
*  Created on: Jan 8, 2013
*      Author: xing
*/

#ifndef PRODUCTER_H_
#define PRODUCTER_H_

#include "ace/Task.h"
#include "ace/Message_Block.h"
#include "Consumer.h"

class Producer:
public ACE_Task<ACE_MT_SYNCH>
{
public:
Producer(Consumer * consumer):
consumer_(consumer), data_(0)
{
mb_=new ACE_Message_Block((char*)&data_,sizeof(data_));
}

int open(void*)
{
ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

//Activate the Task
activate(THR_NEW_LWP,1);
return 0;
}

//The Service Processing routine
int svc(void)
{
while(data_<11)
{
//Send message to consumer
ACE_DEBUG((LM_DEBUG,
"(%t)Sending message: %d to remote task\n",data_));
consumer_->putq(mb_);

//Go to sleep for a sec.
ACE_OS::sleep(1);
data_++;
}
return 0;
}

int close(u_long)
{
ACE_DEBUG((LM_DEBUG,"Producer closes down \n"));
return 0;
}

private:
char data_;
Consumer * consumer_;
ACE_Message_Block * mb_;
};
#endif /* PRODUCTER_H_ */


main.cpp

#include "Consumer.h"
#include "Producter.h"

int main(int argc, char * argv[])
{
Consumer *consumer = new Consumer;
Producer *producer = new Producer(consumer);

producer->open(0);
consumer->open(0);
//Wait for all the tasks to exit.
ACE_Thread_Manager::instance()->wait();
}


 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Ace C++ 线程