转 一个基于ACE的负载自适应万能线程池实现
2011-03-02 12:50
716 查看
在C++中要进行并发处理,不可避免要使用多线程,在传统的教科书中,大家都是采用最原始的多线程技术,应用逻辑和线程并发策略紧密绑定。
在一个典型的服务器程序中,客户端的请求往往包含了很多不同的逻辑命令,如在一个线程处理函数中,需要根据客户端的命令代码处理不同的业务逻辑:
int thrad_main(int cmd_id,char *data){
switch(cmd_id)
{
case 1:
...
break;
case 2:
...
break;
}
}
如此这般,业务处理逻辑和线程逻辑紧密耦合,这是一种很“丑陋”的代码。
如何通过一种优雅的方法,分离并发逻辑和业务逻辑,通过通用的并发框架,业务逻辑设计者只需要关心自己的逻辑代码,交给“线程池”去处理即可,而不需要去关心如何创建线程,等待线程结果这些琐碎的“小事”?
很简单,高手出招,必谈模式,下面是一种常用的并发模式,领导者/追随者线程池模型:
在
一组预先分配的线程中通过“互斥”锁来同步线程之间的行为,“线程”们通过“民主选举”选出一位代表“领导者”站在最前端接收请求,拿到“任务”后,就从
身后的候选“继任者”中选出一个线程代替自己作为“领导者”,自己则变成“工作者”就跑到后面默默去执行处理命令,这个“任务”是一个包含待处理数据和处
理逻辑的自说明性任务,也就是说所有的线程不必事先知道怎么处理接收到的任务,因为他所拿到的“任务包”中就包含了如何处理任务的说明。就像一个“代工工
厂”的工人一样,无需任何文化基础,会干活就行。
那如何实现自说明任务呢?我们定义了一种称为“Method_Request”的对象,它
包含一个接口“virtual int call (void) =
0;”,线程池接受的任务就是这种Method_Request对象的实例,比如一个通知线程池结束工作的Method_Request可以定义为如下的
类:
class ExitRequest : public ACE_Method_Request { public: virtual int call (void){ return -1; // Cause exit. } };
我们重载call接口,添加处理这个请求的逻辑代码,由于仅仅实现通知线程池结束工作的操作,我们返回一个特殊值“-1”,即可只是线城池:
“工作完成了,你赶快洗洗睡吧!”,线程池会检查Method_Request对象的返回值,如果是“0”就是处理正常完成,继续等待下一个任务,如果是
“-1”,就关闭所有线程。
再来一个复杂点的例子,派生的Method_Request不仅有处理逻辑,还包括了需要处理的数据:
1 2 class M2M_EventRequest : public ACE_Method_Request 3 { 4 // Lua解释器,每个事件使用自己单独的脚本上下文 5 LuaVM::ALEE_LuaService & m_svcs; 6 ALEE_ScriptList_t & m_cmds; 7 8 // 事件内容 9 std::string m_type_name; xml_event_t m_xml_event; // 调试信息 DebugInfo_ptr m_debug; public: M2M_EventRequest( LuaVM::ALEE_LuaService & svcs, ALEE_ScriptList_t &cmds, string const & type_name, xml_event_t event); M2M_EventRequest( LuaVM::ALEE_LuaService & svcs, ALEE_ScriptList_t &cmds, string const & type_name, xml_event_t event, DebugInfo_ptr debug); virtual ~M2M_EventRequest (void); virtual int call (void); };
这个Method_Request的功能是,命令线程池调用Lua解析器处理一段脚本代码,详细逻辑就不解释了,仅仅是一个示例,我们的重点在于线程池的实现。
下面就公布这个“万能线程池的”实现,其实这是一个基于ACE的线程库实现的“领导者/追随者”模式,我在其基础上进行了改进,增加了自适应功能,可以根据请求队列的负载,自动调整线程池中的线程数目。
闲话少说,上代码,看得懂的童鞋恭喜你内力深厚,还望多提宝贵意见,看不懂得小盆友也可以努力学习,提高自己:
// LeaderFollower.h
1 #pragma once 2 3 #include "dllmain.h" 4 #include <map> 5 #include <ace/Synch.h> // ACE_Thread_Mutex 6 #include <ace/Task.h> // ACE_Task 7 8 // 线程状态 9 enum LF_Status_t { TH_LEADER_ACTIVE, TH_FOLLOWER, TH_WORKER, TH_READY, TH_STOP, }; struct LF_StatusTime_t { LF_Status_t status; ACE_Time_Value working_tv; ACE_Time_Value start_time; ACE_Time_Value stop_time; ACE_Time_Value work_start; ACE_Time_Value work_time; }; typedef std::map<ACE_thread_t,LF_StatusTime_t> LF_StatusTimeList_t; class LF_Follower; // 领导者-追随者线程池 模式实现 class CPPXCORBA_API LeaderFollower { public: LeaderFollower(void); ~LeaderFollower(void); protected: LF_Follower * make_follower(void); int become_leader(void); int elect_new_leader(void); bool leader_active(void); void set_active_leader(ACE_thread_t leader); private: ACE_Unbounded_Queue<LF_Follower*> m_followers; ACE_Thread_Mutex m_followers_lock; ACE_Thread_Mutex m_leader_lock; ACE_thread_t m_current_leader; ////////////////////////////////////////////////////////////////////////// /// 线程池状态监控 public: const LF_StatusTimeList_t & get_status(void) const; const float get_load_rate(void) const; protected: void set_status(LF_Status_t status); void set_worktime(ACE_Time_Value work_time); private: LF_StatusTimeList_t m_status_time_list; ACE_Thread_Mutex m_status_lock; };
// LeaderFollower.cpp
1 #include "stdafx.h" 2 #include "LeaderFollower.h" 3 #include "../cppx.core/dllmain.h" 4 5 // 追随者标记 6 class LF_Follower 7 { 8 ACE_Condition<ACE_Thread_Mutex> m_cond; 9 ACE_thread_t m_owner; 10 11 public: 12 LF_Follower(ACE_Thread_Mutex &leader_lock) : m_cond(leader_lock) { 13 m_owner = ACE_Thread::self(); 14 } 15 int wait(void){ 16 return m_cond.wait(); 17 } 18 int signal(void){ 19 return m_cond.signal(); 20 } 21 ACE_thread_t owner(void){ 22 return m_owner; 23 } 24 25 }; 26 27 ////////////////////////////////////////////////////////////////////////// 28 LeaderFollower::LeaderFollower(void) : 29 m_current_leader(0) 30 { 31 } 32 33 LeaderFollower::~LeaderFollower(void) 34 { 35 } 36 37 LF_Follower * 38 LeaderFollower::make_follower( void ) 39 { 40 ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, 0); 41 42 LF_Follower *fw; 43 ACE_NEW_RETURN(fw, LF_Follower(m_leader_lock), 0); 44 m_followers.enqueue_tail(fw); 45 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) make_follower \t: Now has %d followers.\n"), m_followers.size())); 46 return fw; 47 } 48 49 int 50 LeaderFollower::become_leader( void ) 51 { 52 ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1); 53 54 if( leader_active() && m_current_leader != ACE_Thread::self() ){ 55 while(leader_active()){ 56 set_status(TH_FOLLOWER); 57 auto_ptr<LF_Follower> fw(make_follower()); 58 fw->wait(); // Wait until told to do so. 59 } 60 } 61 62 // Mark yourself as the active leader. 63 set_active_leader(ACE_Thread::self()); 64 set_status(TH_LEADER_ACTIVE); 65 //ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) become_leader \t: Becoming the leader.\n"))); 66 return 0; 67 } 68 69 int 70 LeaderFollower::elect_new_leader( void ) 71 { 72 ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1); 73 74 set_active_leader(0); 75 76 // Wake up a follower 77 if( !m_followers.is_empty() ){ 78 ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, -1); 79 80 // Get the old follower. 81 LF_Follower *fw; 82 if( m_followers.dequeue_head(fw) != 0 ) 83 return -1; 84 85 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Resigning and electing %d.\n"), fw->owner())); 86 return (fw->signal() == 0) ? 0 : -1; 87 } 88 89 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Oops no followers left\n"))); 90 return -1; 91 } 92 93 bool 94 LeaderFollower::leader_active( void ) 95 { 96 return (m_current_leader != 0); 97 } 98 99 void LeaderFollower::set_active_leader( ACE_thread_t leader ) { m_current_leader = leader; } void LeaderFollower::set_worktime( ACE_Time_Value work_time ) { ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock); LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()]; info.working_tv = work_time; } void LeaderFollower::set_status( LF_Status_t status ) { ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock); LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()]; switch(status) { case TH_READY: info.start_time = ACE_OS::gettimeofday(); break; case TH_STOP: info.stop_time = ACE_OS::gettimeofday(); break; case TH_WORKER: info.work_start = ACE_OS::gettimeofday(); break; case TH_LEADER_ACTIVE: case TH_FOLLOWER: if( info.status == TH_WORKER ) info.work_time += ACE_OS::gettimeofday() - info.work_start; break; } info.status = status; } const LF_StatusTimeList_t & LeaderFollower::get_status( void ) const { return m_status_time_list; } const float LeaderFollower::get_load_rate( void ) const { ACE_Time_Value work_time,run_time; foreach(const LF_StatusTimeList_t::value_type & info,get_status()){ if( info.second.status != TH_STOP ){ work_time += info.second.work_time; run_time += ACE_OS::gettimeofday() - info.second.start_time; } } return (float)work_time.usec()/run_time.usec()*100; }
// LF_ThreadPool.h
1 #pragma once 2 3 #include "LeaderFollower.h" 4 5 #include <ace/Task.h> 6 #include <ace/Activation_Queue.h> 7 #include <ace/Method_Request.h> 8 9 class CPPXCORBA_API LF_ThreadPool : public ACE_Task_Base, public LeaderFollower { class ExitRequest : public ACE_Method_Request { public: virtual int call (void){ return -1; // Cause exit. } }; bool m_bShutdown; bool m_bRunning; ACE_Activation_Queue m_activation_queue_; static const size_t ScheduleTime = 10; static const size_t MinThreadNum = 10; static const size_t MaxThreadNum = 20; public: LF_ThreadPool(void); ~LF_ThreadPool(void); virtual int svc(void); int start_stread_pool( void ); int stop_thread_pool( void ); int post_request( ACE_Method_Request *request ); int get_queue_load(void){ return m_activation_queue_.method_count(); } int get_max_thread(void){ return MaxThreadNum; } int get_min_thread(void){ return MinThreadNum; } private: int _fork_new_thread( void ); int _post_exit_request(void); };
// LF_ThreadPool.cpp
1 #include "stdafx.h" 2 #include "LF_ThreadPool.h" 3 4 LF_ThreadPool::LF_ThreadPool(void) : 5 m_bShutdown(false), 6 m_bRunning(false) 7 { 8 } 9 10 LF_ThreadPool::~LF_ThreadPool(void) 11 { 12 } 13 14 int LF_ThreadPool::svc( void ) 15 { 16 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread started.\t: %d working threads left.\n"),thr_count())); 17 18 // 线程开始运行 19 m_bRunning = true; 20 21 set_status(TH_READY); 22 23 while(true){ 24 // Block until this thread is the leader. 25 become_leader(); 26 27 // 设置线程空闲时间,空闲线程将会自动退出 28 ACE_Time_Value tv(ScheduleTime); 29 tv += ACE_OS::gettimeofday(); 30 31 // 从队列获取下一个请求,并获得所有权 32 auto_ptr<ACE_Method_Request> request(m_activation_queue_.dequeue(&tv)); 33 if( request.get() == 0 ){ // 长时间没有请求,dequeue超时返回 34 if( elect_new_leader() == 0 && thr_count() > MinThreadNum ) // 成功选择新的领导者,且工作线程数大于最少线程数 35 break; // 结束当前线程 36 if( thr_count() < MinThreadNum && thr_count() < MaxThreadNum ) // 工作线程数小于最少线程数,创建新的线程 37 _fork_new_thread(); 38 continue; // 继续担当领导者(优先成为领导者),或返回线程池等待 39 } 40 41 // Elect a new leader then process the request 42 if( elect_new_leader() != 0 || thr_count() < MinThreadNum ) // 没有空余线程可成为领导者,或者线程池容量调整 43 if( !m_bShutdown ) // 且没有调度关闭 44 if( thr_count() < MaxThreadNum ) // 未达到线程数上线 45 _fork_new_thread(); // 创建新的线程 46 47 // Invoke the method request. 48 set_status(TH_WORKER); 49 50 ACE_Time_Value tv_start,tv_finish,tv_working; 51 tv_start = ACE_OS::gettimeofday(); 52 53 int result = request->call(); 54 55 tv_finish = ACE_OS::gettimeofday(); 56 tv_working = tv_finish - tv_start; 57 set_worktime(tv_working); 58 59 if( result == -1 ){ 60 if( thr_count() > 1 ) // If received a ExitMethod, Notify the next Thread(if exists) to exit too. 61 _post_exit_request(); 62 break; 63 } 64 } 65 66 // 剩下最后一个线程,线程池停止 67 if( thr_count() == 1 ) 68 m_bRunning = false; 69 70 set_status(TH_STOP); 71 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread stoped.\t: %d working threads left.\n"),thr_count()-1)); 72 return 0; 73 } 74 75 int LF_ThreadPool::start_stread_pool( void ) 76 { 77 m_bShutdown = false; 78 return activate(THR_NEW_LWP| THR_JOINABLE,MinThreadNum); 79 } 80 81 int LF_ThreadPool::stop_thread_pool( void ) 82 { 83 // 线程池已停止 84 if( !m_bRunning ) 85 return 0; 86 87 m_bShutdown = true; 88 _post_exit_request(); 89 return wait(); 90 } 91 92 int LF_ThreadPool::post_request( ACE_Method_Request *request ) 93 { 94 ACE_TRACE (ACE_TEXT ("SvcThreadPool::enqueue")); 95 return m_activation_queue_.enqueue (request); 96 } 97 98 int LF_ThreadPool::_fork_new_thread( void ) 99 { return activate(THR_NEW_LWP| THR_JOINABLE,1,1); } int LF_ThreadPool::_post_exit_request( void ) { return post_request(new ExitRequest); }
怎么样?很简单吧?什么?怎么用?Oh My Lady GaGa!还是告诉你吧:
m_pool.post_request(new M2M_EventRequest(m_lua_svc,m_lua_scripts,type_name,xml_event,*iter));
需要线程池出来干活的时候,创建一个请求对象,扔给他就行了!
好了,代码就是最好的文档,C++开源社区给了我成长的土壤,希望能对后来者有所帮助。
把这些东西贴出来,是为了整理自己的大脑,免得这些曾经顶着熊猫眼熬出来的东西,尘封在茫茫的代码海洋中,取之于前辈,还之于后人。也希望有更多的高手能够慷慨布道,壮大我们的C++社区。
相关文章推荐
- 一个基于ACE的线程池的实现(转)
- 在DNS中为一个域名配置多个IP地址实现负载和分流---基于WINDOWS服务器
- 《你必须知道的.NET》读书实践:一个基于OO的万能加载器的实现
- 基于ACE的线程池学习与实现(二)——ACE_Message_Block与ACE_Condition
- 基于ACE的线程池学习与实现(三)—— 并发编程资料
- 基于ACE实现的一个内存池-续篇
- 在DNS中为一个域名配置多个IP地址实现负载和分流---基于WINDOWS服务器
- C++实现的带最大最小线程数的线程池(基于ACE)
- 一个C++基于boost简单实现的线程池
- 基于ACE的线程池学习与实现(一)——ACE_Task
- C++实现的带最大最小线程数的线程池(基于ACE)
- 一个简单的java线程池实现
- Rhyme/ 手写服务器 实现一个基于xml解析的简单的Tomcat服务器
- Qt: 基于qextserialport实现的一个小串口工具
- 随笔之如何实现一个线程池
- 写出一个你自己的MVC框架-基于对springMVC源码实现和理解(5):数据初始化(四)
- 我用 tensorflow 实现的“一个神经聊天模型”:一个基于深度学习的聊天机器人
- 基于自适应PSO 的RBF网络分类算法实现
- 一种基于HAProxy实现RMI负载均衡的方法
- 写出一个你自己的MVC框架-基于对springMVC源码实现和理解(4):数据初始化(三)