您的位置:首页 > 其它

转 一个基于ACE的负载自适应万能线程池实现

2011-03-02 12:50 716 查看



在C++中要进行并发处理,不可避免要使用多线程,在传统的教科书中,大家都是采用最原始的多线程技术,应用逻辑和线程并发策略紧密绑定。
在一个典型的服务器程序中,客户端的请求往往包含了很多不同的逻辑命令,如在一个线程处理函数中,需要根据客户端的命令代码处理不同的业务逻辑:

int thrad_main(int cmd_id,char *data){
switch(cmd_id)
{
case 1:
...
break;
case 2:
...
break;
}
}

如此这般,业务处理逻辑和线程逻辑紧密耦合,这是一种很“丑陋”的代码。
如何通过一种优雅的方法,分离并发逻辑和业务逻辑,通过通用的并发框架,业务逻辑设计者只需要关心自己的逻辑代码,交给“线程池”去处理即可,而不需要去关心如何创建线程,等待线程结果这些琐碎的“小事”?

很简单,高手出招,必谈模式,下面是一种常用的并发模式,领导者/追随者线程池模型:




一组预先分配的线程中通过“互斥”锁来同步线程之间的行为,“线程”们通过“民主选举”选出一位代表“领导者”站在最前端接收请求,拿到“任务”后,就从
身后的候选“继任者”中选出一个线程代替自己作为“领导者”,自己则变成“工作者”就跑到后面默默去执行处理命令,这个“任务”是一个包含待处理数据和处
理逻辑的自说明性任务,也就是说所有的线程不必事先知道怎么处理接收到的任务,因为他所拿到的“任务包”中就包含了如何处理任务的说明。就像一个“代工工
厂”的工人一样,无需任何文化基础,会干活就行。
那如何实现自说明任务呢?我们定义了一种称为“Method_Request”的对象,它
包含一个接口“virtual int call (void) =
0;”,线程池接受的任务就是这种Method_Request对象的实例,比如一个通知线程池结束工作的Method_Request可以定义为如下的
类:

class ExitRequest : public ACE_Method_Request
{
public:
virtual int call (void){
return -1;  // Cause exit.
}
};

我们重载call接口,添加处理这个请求的逻辑代码,由于仅仅实现通知线程池结束工作的操作,我们返回一个特殊值“-1”,即可只是线城池:
“工作完成了,你赶快洗洗睡吧!”,线程池会检查Method_Request对象的返回值,如果是“0”就是处理正常完成,继续等待下一个任务,如果是
“-1”,就关闭所有线程。

再来一个复杂点的例子,派生的Method_Request不仅有处理逻辑,还包括了需要处理的数据:

1
2 class M2M_EventRequest : public ACE_Method_Request
3 {
4     // Lua解释器,每个事件使用自己单独的脚本上下文
5     LuaVM::ALEE_LuaService & m_svcs;
6     ALEE_ScriptList_t & m_cmds;
7
8     // 事件内容
9     std::string m_type_name;
xml_event_t m_xml_event;

// 调试信息
DebugInfo_ptr m_debug;

public:
M2M_EventRequest(
LuaVM::ALEE_LuaService & svcs,
ALEE_ScriptList_t &cmds,
string const & type_name,
xml_event_t event);

M2M_EventRequest(
LuaVM::ALEE_LuaService & svcs,
ALEE_ScriptList_t &cmds,
string const & type_name,
xml_event_t event,
DebugInfo_ptr debug);

virtual ~M2M_EventRequest (void);

virtual int call (void);
};

这个Method_Request的功能是,命令线程池调用Lua解析器处理一段脚本代码,详细逻辑就不解释了,仅仅是一个示例,我们的重点在于线程池的实现。
下面就公布这个“万能线程池的”实现,其实这是一个基于ACE的线程库实现的“领导者/追随者”模式,我在其基础上进行了改进,增加了自适应功能,可以根据请求队列的负载,自动调整线程池中的线程数目。
闲话少说,上代码,看得懂的童鞋恭喜你内力深厚,还望多提宝贵意见,看不懂得小盆友也可以努力学习,提高自己:
// LeaderFollower.h

1 #pragma once
2
3 #include "dllmain.h"
4 #include <map>
5 #include <ace/Synch.h>    // ACE_Thread_Mutex
6 #include <ace/Task.h>    // ACE_Task
7
8 // 线程状态
9 enum LF_Status_t
{
TH_LEADER_ACTIVE,
TH_FOLLOWER,
TH_WORKER,
TH_READY,
TH_STOP,
};

struct LF_StatusTime_t
{
LF_Status_t    status;
ACE_Time_Value working_tv;
ACE_Time_Value start_time;
ACE_Time_Value stop_time;
ACE_Time_Value work_start;
ACE_Time_Value work_time;
};

typedef std::map<ACE_thread_t,LF_StatusTime_t>  LF_StatusTimeList_t;

class LF_Follower;

// 领导者-追随者线程池 模式实现
class CPPXCORBA_API LeaderFollower
{
public:
LeaderFollower(void);
~LeaderFollower(void);

protected:
LF_Follower * make_follower(void);
int     become_leader(void);
int     elect_new_leader(void);
bool leader_active(void);
void set_active_leader(ACE_thread_t leader);

private:
ACE_Unbounded_Queue<LF_Follower*>   m_followers;
ACE_Thread_Mutex                    m_followers_lock;
ACE_Thread_Mutex                    m_leader_lock;
ACE_thread_t                        m_current_leader;

//////////////////////////////////////////////////////////////////////////
/// 线程池状态监控
public:
const LF_StatusTimeList_t & get_status(void) const;
const float get_load_rate(void) const;

protected:
void set_status(LF_Status_t status);
void set_worktime(ACE_Time_Value work_time);

private:
LF_StatusTimeList_t m_status_time_list;
ACE_Thread_Mutex    m_status_lock;
};


// LeaderFollower.cpp

1 #include "stdafx.h"
2 #include "LeaderFollower.h"
3 #include "../cppx.core/dllmain.h"
4
5 // 追随者标记
6 class LF_Follower
7 {
8     ACE_Condition<ACE_Thread_Mutex> m_cond;
9     ACE_thread_t                    m_owner;
10
11 public:
12     LF_Follower(ACE_Thread_Mutex &leader_lock) : m_cond(leader_lock) {
13         m_owner = ACE_Thread::self();
14     }
15     int wait(void){
16         return m_cond.wait();
17     }
18     int signal(void){
19         return m_cond.signal();
20     }
21     ACE_thread_t owner(void){
22         return m_owner;
23     }
24
25 };
26
27 //////////////////////////////////////////////////////////////////////////
28 LeaderFollower::LeaderFollower(void) :
29 m_current_leader(0)
30 {
31 }
32
33 LeaderFollower::~LeaderFollower(void)
34 {
35 }
36
37 LF_Follower *
38 LeaderFollower::make_follower( void )
39 {
40     ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, 0);
41
42     LF_Follower *fw;
43     ACE_NEW_RETURN(fw, LF_Follower(m_leader_lock), 0);
44     m_followers.enqueue_tail(fw);
45     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) make_follower \t: Now has %d followers.\n"), m_followers.size()));
46     return fw;
47 }
48
49 int
50 LeaderFollower::become_leader( void )
51 {
52     ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
53
54     if( leader_active() && m_current_leader != ACE_Thread::self() ){
55         while(leader_active()){
56             set_status(TH_FOLLOWER);
57             auto_ptr<LF_Follower> fw(make_follower());
58             fw->wait();         // Wait until told to do so.
59         }
60     }
61
62     // Mark yourself as the active leader.
63     set_active_leader(ACE_Thread::self());
64     set_status(TH_LEADER_ACTIVE);
65     //ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) become_leader \t: Becoming the leader.\n")));
66     return 0;
67 }
68
69 int
70 LeaderFollower::elect_new_leader( void )
71 {
72     ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
73
74     set_active_leader(0);
75
76     // Wake up a follower
77     if( !m_followers.is_empty() ){
78         ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, -1);
79
80         // Get the old follower.
81         LF_Follower *fw;
82         if( m_followers.dequeue_head(fw) != 0 )
83             return -1;
84
85         //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Resigning and electing %d.\n"), fw->owner()));
86         return (fw->signal() == 0) ? 0 : -1;
87     }
88
89     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Oops no followers left\n")));
90     return -1;
91 }
92
93 bool
94 LeaderFollower::leader_active( void )
95 {
96     return (m_current_leader != 0);
97 }
98
99 void
LeaderFollower::set_active_leader( ACE_thread_t leader )
{
m_current_leader = leader;
}

void LeaderFollower::set_worktime( ACE_Time_Value work_time )
{
ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
info.working_tv = work_time;
}

void LeaderFollower::set_status( LF_Status_t status )
{
ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
switch(status)
{
case TH_READY:
info.start_time = ACE_OS::gettimeofday();
break;
case TH_STOP:
info.stop_time = ACE_OS::gettimeofday();
break;
case TH_WORKER:
info.work_start = ACE_OS::gettimeofday();
break;
case TH_LEADER_ACTIVE:
case TH_FOLLOWER:
if( info.status == TH_WORKER )
info.work_time += ACE_OS::gettimeofday() - info.work_start;
break;
}
info.status = status;
}

const LF_StatusTimeList_t &
LeaderFollower::get_status( void ) const
{
return m_status_time_list;
}

const float
LeaderFollower::get_load_rate( void ) const
{
ACE_Time_Value work_time,run_time;
foreach(const LF_StatusTimeList_t::value_type & info,get_status()){
if( info.second.status != TH_STOP ){
work_time += info.second.work_time;
run_time += ACE_OS::gettimeofday() - info.second.start_time;
}
}
return (float)work_time.usec()/run_time.usec()*100;
}


// LF_ThreadPool.h

1 #pragma once
2
3 #include "LeaderFollower.h"
4
5 #include <ace/Task.h>
6 #include <ace/Activation_Queue.h>
7 #include <ace/Method_Request.h>
8
9 class CPPXCORBA_API LF_ThreadPool :
public ACE_Task_Base,
public LeaderFollower
{
class ExitRequest : public ACE_Method_Request
{
public:
virtual int call (void){
return -1;  // Cause exit.
}
};

bool m_bShutdown;
bool m_bRunning;
ACE_Activation_Queue m_activation_queue_;

static const size_t ScheduleTime = 10;
static const size_t MinThreadNum = 10;
static const size_t MaxThreadNum = 20;

public:
LF_ThreadPool(void);
~LF_ThreadPool(void);

virtual int svc(void);

int start_stread_pool( void );
int stop_thread_pool( void );
int post_request( ACE_Method_Request *request );

int get_queue_load(void){ return m_activation_queue_.method_count(); }
int get_max_thread(void){ return MaxThreadNum; }
int get_min_thread(void){ return MinThreadNum; }

private:
int _fork_new_thread( void );
int _post_exit_request(void);
};


// LF_ThreadPool.cpp

1 #include "stdafx.h"
2 #include "LF_ThreadPool.h"
3
4 LF_ThreadPool::LF_ThreadPool(void) :
5 m_bShutdown(false),
6 m_bRunning(false)
7 {
8 }
9
10 LF_ThreadPool::~LF_ThreadPool(void)
11 {
12 }
13
14 int LF_ThreadPool::svc( void )
15 {
16     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread started.\t: %d working threads left.\n"),thr_count()));
17
18     // 线程开始运行
19     m_bRunning = true;
20
21     set_status(TH_READY);
22
23     while(true){
24         // Block until this thread is the leader.
25         become_leader();
26
27         // 设置线程空闲时间,空闲线程将会自动退出
28         ACE_Time_Value tv(ScheduleTime);
29         tv += ACE_OS::gettimeofday();
30
31         // 从队列获取下一个请求,并获得所有权
32         auto_ptr<ACE_Method_Request> request(m_activation_queue_.dequeue(&tv));
33         if( request.get() == 0 ){                                               // 长时间没有请求,dequeue超时返回
34             if( elect_new_leader() == 0 && thr_count() > MinThreadNum )         // 成功选择新的领导者,且工作线程数大于最少线程数
35                 break;                                                          // 结束当前线程
36             if( thr_count() < MinThreadNum && thr_count() < MaxThreadNum )      // 工作线程数小于最少线程数,创建新的线程
37                 _fork_new_thread();
38             continue;                                                           // 继续担当领导者(优先成为领导者),或返回线程池等待
39         }
40
41         // Elect a new leader then process the request
42         if( elect_new_leader() != 0 || thr_count() < MinThreadNum )             // 没有空余线程可成为领导者,或者线程池容量调整
43             if( !m_bShutdown )                                                  // 且没有调度关闭
44                 if( thr_count() < MaxThreadNum )                                // 未达到线程数上线
45                     _fork_new_thread();                                         // 创建新的线程
46
47         // Invoke the method request.
48         set_status(TH_WORKER);
49
50         ACE_Time_Value tv_start,tv_finish,tv_working;
51         tv_start = ACE_OS::gettimeofday();
52
53         int result = request->call();
54
55         tv_finish = ACE_OS::gettimeofday();
56         tv_working = tv_finish - tv_start;
57         set_worktime(tv_working);
58
59         if( result == -1 ){
60             if( thr_count() > 1 )                                                // If received a ExitMethod, Notify the next Thread(if exists) to exit too.
61                 _post_exit_request();
62             break;
63         }
64     }
65
66     // 剩下最后一个线程,线程池停止
67     if( thr_count() == 1 )
68         m_bRunning = false;
69
70     set_status(TH_STOP);
71     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread stoped.\t: %d working threads left.\n"),thr_count()-1));
72     return 0;
73 }
74
75 int LF_ThreadPool::start_stread_pool( void )
76 {
77     m_bShutdown = false;
78     return activate(THR_NEW_LWP| THR_JOINABLE,MinThreadNum);
79 }
80
81 int LF_ThreadPool::stop_thread_pool( void )
82 {
83     // 线程池已停止
84     if( !m_bRunning )
85         return 0;
86
87     m_bShutdown = true;
88     _post_exit_request();
89     return wait();
90 }
91
92 int LF_ThreadPool::post_request( ACE_Method_Request *request )
93 {
94     ACE_TRACE (ACE_TEXT ("SvcThreadPool::enqueue"));
95     return m_activation_queue_.enqueue (request);
96 }
97
98 int LF_ThreadPool::_fork_new_thread( void )
99 {
return activate(THR_NEW_LWP| THR_JOINABLE,1,1);
}

int LF_ThreadPool::_post_exit_request( void )
{
return post_request(new ExitRequest);
}


怎么样?很简单吧?什么?怎么用?Oh My Lady GaGa!还是告诉你吧:

m_pool.post_request(new M2M_EventRequest(m_lua_svc,m_lua_scripts,type_name,xml_event,*iter));

需要线程池出来干活的时候,创建一个请求对象,扔给他就行了!

好了,代码就是最好的文档,C++开源社区给了我成长的土壤,希望能对后来者有所帮助。
把这些东西贴出来,是为了整理自己的大脑,免得这些曾经顶着熊猫眼熬出来的东西,尘封在茫茫的代码海洋中,取之于前辈,还之于后人。也希望有更多的高手能够慷慨布道,壮大我们的C++社区。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: