C++实现的带最大最小线程数的线程池(基于ACE)
2010-01-03 13:06
597 查看
最近需要一个能根据请求数变化的线程池,JAVA有这样的东西,可是C++下好像一般只是固定大小的线程池。所以就基于ACE写了个,只做了初步测试。
主要思想是:
1. 重载ACE_Task,这相当于是个固定线程池,用一个信号量(ACE_Thread_Semaphore)来记数空闲线程数。
2. 初始化时根据用户的输入,确定最少线程数minnum和最大线程数maxnum,当多个请求到来,并且无空闲线程(信号量用光),判断总线程数小于maxnum,就开始强迫增加线程数。
3. 当线程响应完一个请求(任务)后,如果当前任务队列为空,且线程数大于minnum,就退出本线程。这里做了一个优化,就算满足条件,线程也会在队列上再等待10秒,防止线程池抖动带来不必要的开销。
使用:
重载这个类,重载service_func函数实现自己的任务处理。
start_pool初始化线程池,之后,就可以用add_task向线程池添加任务。
它会根据请求的数量自动控制池大小进行处理。
已经在LINUX下测试通过。由于ACE是跨平台的,所以这个实现也应该可以在WINDOWS下工作。
编译:
带THREAD_POOL_UNIT_TEST选项,则编译出自测程序test
gcc -g -Wall -O2 -g -Wall -I. -I../ -I../mon/comm/ACE_wrappers -g -DTHREAD_POOL_UNIT_TEST -o test thread_pool.cpp -lpthread -lm -lz -lstdc++ ../mon/comm/ACE_wrappers/ace/libACE.a -ldl
thread_pool.h头文件:
主要思想是:
1. 重载ACE_Task,这相当于是个固定线程池,用一个信号量(ACE_Thread_Semaphore)来记数空闲线程数。
2. 初始化时根据用户的输入,确定最少线程数minnum和最大线程数maxnum,当多个请求到来,并且无空闲线程(信号量用光),判断总线程数小于maxnum,就开始强迫增加线程数。
3. 当线程响应完一个请求(任务)后,如果当前任务队列为空,且线程数大于minnum,就退出本线程。这里做了一个优化,就算满足条件,线程也会在队列上再等待10秒,防止线程池抖动带来不必要的开销。
使用:
重载这个类,重载service_func函数实现自己的任务处理。
start_pool初始化线程池,之后,就可以用add_task向线程池添加任务。
它会根据请求的数量自动控制池大小进行处理。
已经在LINUX下测试通过。由于ACE是跨平台的,所以这个实现也应该可以在WINDOWS下工作。
编译:
带THREAD_POOL_UNIT_TEST选项,则编译出自测程序test
gcc -g -Wall -O2 -g -Wall -I. -I../ -I../mon/comm/ACE_wrappers -g -DTHREAD_POOL_UNIT_TEST -o test thread_pool.cpp -lpthread -lm -lz -lstdc++ ../mon/comm/ACE_wrappers/ace/libACE.a -ldl
thread_pool.h头文件:
#ifndef THREAD_POOL #define THREAD_POOL #include "ace/Task.h" #include "ace/Thread_Mutex.h" #include "ace/Thread_Semaphore.h" #include <ace/OS.h> class thread_pool : public ACE_Task<ACE_MT_SYNCH> { public: thread_pool (); ~thread_pool (); // begin the initial threads and waiting for request int start_pool ( int minnum = 5, // min number of thread int maxnum = 100, // max number of thread int waitsize = 1024, // request queue length int parsize = 1024); // your parameter size // pending request in work queue int wait_cnt (); // add one task to thread pool int add_task (void *arg, int size); // user defined work thread function virtual int service_func (void* arg); // overide base class function for thread pool logical virtual int svc (void); // not use virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg); private: int minnum_, maxnum_; int waitsize_, parsize_; // ACE_Recursive_Thread_Mutex free_thread_cnt__mutex_; ACE_Thread_Semaphore *pfree_thread_; // for free thread count long thread_flags_; // ace thread create flag }; #endif /**//* THREAD_POOL */ thread_pool.cpp实现文件: #include "thread.h" #define THREAD_POOL_DONOT_ACQUIRE 0x1001 // do not aquire again in new added thread thread_pool::thread_pool () { thread_flags_ = THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED; pfree_thread_ = NULL; } thread_pool::~thread_pool () { if (pfree_thread_) delete pfree_thread_; } int thread_pool::wait_cnt () { return this->msg_queue()->message_count (); } int thread_pool::handle_timeout (const ACE_Time_Value &tv, const void *arg) { return 0; } int thread_pool::start_pool ( int minnum, int maxnum, int waitsize, int parsize ) { minnum_ = minnum; maxnum_ = maxnum; waitsize_ = waitsize; parsize_ = parsize; this->msg_queue()->high_water_mark (waitsize * parsize); pfree_thread_ = new ACE_Thread_Semaphore (minnum); int ret = this->activate (thread_flags_, minnum); return ret; } int thread_pool::add_task (void *arg, int size) { ACE_Message_Block *mb = new ACE_Message_Block (parsize_); // test free threads condition if (pfree_thread_->tryacquire () == -1) { // acquire one free thread to do work printf ("free thread used up/n"); if (this->thr_count () < maxnum_) { this->activate (thread_flags_, 1, 1); printf ("new thread release/n"); pfree_thread_->release (); printf ("new one thread, now %d/n", this->thr_count ()); } else { printf ("can't new more threads, queue len %d/n", wait_cnt () + 1); } } else { // pfree_thread_->release (); // restore cnt, let svc function do acquire work printf ("new task acquire/n"); mb->set_flags (THREAD_POOL_DONOT_ACQUIRE); } // create msg printf ("add msg/n"); memcpy (mb->wr_ptr (), (char*) arg, size); this->putq (mb); return 0; } int thread_pool::service_func (void* arg) { ACE_OS::sleep (1); printf ("finished task %d in thread %02X/n", *(int*) arg, (int)ACE_Thread::self ()); return 0; } int thread_pool::svc (void) { printf ("thread started/n"); while (1) { ACE_Message_Block *b = 0; ACE_Time_Value wait = ACE_OS::gettimeofday (); wait.sec (wait.sec () + 10); // timeout in 10 secs to test if more tasks need to do or we'll exit if (this->getq (b, &wait) < 0) { if (this->thr_count () > minnum_) { printf ("over task acquire/n"); pfree_thread_->acquire (); printf ("delete one thread, now %d/n", this->thr_count ()-1); return 0; } else continue; // I'm the one of last min number of threads } if (b->flags () & THREAD_POOL_DONOT_ACQUIRE == 0) { printf ("queue task acquire/n"); pfree_thread_->acquire (); // I'll use one free thread } else printf ("no need to acquire/n"); this->service_func ((void*)b->rd_ptr()); printf ("finished release/n"); b->release(); pfree_thread_->release (); // added one free thread } return 0; } int main (int argc, char* argv[]) { printf ("begin test:/n"); thread_pool t; t.start_pool (10, 100); for (int i=0; i<200; i++) { t.add_task (&i, sizeof(i)); if (i % 20 == 0) ACE_OS::sleep (1); } ACE_OS::sleep (2); ACE_Thread_Manager::instance()->wait(); return 0; }
相关文章推荐
- C++实现的带最大最小线程数的线程池(基于ACE)
- 最大流—最小割的C++实现
- 最大流与最小割C++实现2——深度优先搜索
- 基于算法导论6.5用最大堆实现的优先队列(C++)
- 最大堆(最小堆)C++实现源码
- 【C++基本功补习】查找三个数据中的最大值、最小值、中间值,两种实现方案比较
- c++/java/python priority_que实现最大堆和最小堆
- 基于pthread的线程池,C++实现
- C++使用两个栈实现一个可以获取栈中最大值和最小值的栈
- [C++ 实现最大值优先队列和最小值优先队列]
- 用C++实现最小公倍数和最大公约数
- SSE图像算法优化系列七:基于SSE实现的极速的矩形核腐蚀和膨胀(最大值和最小值)算法。
- 基于OpenCV和C++实现最大阈值分割算法
- 一个C++基于boost简单实现的线程池
- C++利用vector容器实现最大最小元问题
- 算法导论-第23章-最小生成树:Prime算法(基于vector)的C++实现
- C++ 实现从0~100中随机生成50个数,统计出现的数字最大值和最小值,输出出现最多的次数及对应的数字
- 基于C++求两个数的最大公约数最小公倍数
- 转 一个基于ACE的负载自适应万能线程池实现
- 算法设计之,堆,堆排序,基于最大堆的最大优先队列的实现(C++实现)