线程池(Linux实现)
2014-03-26 17:43
357 查看
讨论QQ群:135202158
本文技术参考了sourceforge项目c thread pool,链接:http://sourceforge.net/projects/cthpool/
线程池如上一篇随笔(/article/5928096.html)提到的内存池一样,也是一种池化策略,在启动时(或者更高级的,运行时按一定策略分配)预先开启N个线程,当没有工作要做时,这些线程处于睡眠中;一旦有工作加入工作队列,其中的某些线程就会醒来,处理这些工作,完成后继续睡眠 。
要实现线程池(只针对本文的简单实现而言),应设计和构建3样东西:
含N个线程的线程组
工作队列
工作线程例程
线程组和工作队列表示如下:
线程组可以用普通数组或者动态分配的数组实现,维数就是池中线程数量,存放的其实是线程ID。工作队列可以直接用C++ queue容器实现。
工作线程例程(线程函数)的大致执行流程如下图所示:
工作队列中没有工作时它就睡眠 ,有工作时苏醒,从队列首部取出(&删除)一个工作,然后开始执行。
另外,我们还需要一个互斥锁L和一个计数信号量S,互斥锁用来同步工作队列的增删操作,计数信号量用来对工作队列中的工作数量进行记录。工作线程会一直等待S,直到它大于0。
下面给出完整代码。
1. threadpool.h
2. threadpool.cpp
3. 测试程序main.cpp
本文技术参考了sourceforge项目c thread pool,链接:http://sourceforge.net/projects/cthpool/
线程池如上一篇随笔(/article/5928096.html)提到的内存池一样,也是一种池化策略,在启动时(或者更高级的,运行时按一定策略分配)预先开启N个线程,当没有工作要做时,这些线程处于睡眠中;一旦有工作加入工作队列,其中的某些线程就会醒来,处理这些工作,完成后继续睡眠 。
要实现线程池(只针对本文的简单实现而言),应设计和构建3样东西:
含N个线程的线程组
工作队列
工作线程例程
线程组和工作队列表示如下:
/* * Threads: * * +----------+----------+------+------------+ * | thread 0 | thread 1 | .... | thread n-1 | * +----------+----------+------+------------+ * * Job Queue: * * back front * | | * v v * +-------+ +-------+ +-------+ * | job 0 | -> | job 1 | -> ... -> | job x | * +-------+ +-------+ +-------+ * */
线程组可以用普通数组或者动态分配的数组实现,维数就是池中线程数量,存放的其实是线程ID。工作队列可以直接用C++ queue容器实现。
工作线程例程(线程函数)的大致执行流程如下图所示:
/* * * Each Thread Routine: * Job-Queue * | ... * v | * +-------+ +---------+ EnQueue * +---> | sleep | (No job) | new job | <--------- Client * | +-------+ +---------+ * | | | * | | DeQueue +---------+ * | + <----------- | new job | * | | +---------+ * | v * | +---------+ * | | do work | * | +---------+ * | | * | | * +----<----+ * */
工作队列中没有工作时它就睡眠 ,有工作时苏醒,从队列首部取出(&删除)一个工作,然后开始执行。
另外,我们还需要一个互斥锁L和一个计数信号量S,互斥锁用来同步工作队列的增删操作,计数信号量用来对工作队列中的工作数量进行记录。工作线程会一直等待S,直到它大于0。
下面给出完整代码。
1. threadpool.h
/* * Linux线程池的简单实现. * Author: 赵子清 * Blog: http://www.cnblogs.com/zzqcn * **/ #ifndef __THREADPOOL_H__ #define __THREADPOOL_H__ #include <semaphore.h> #include <pthread.h> #include <queue> #define DLPTP_MAX_THREADS 1024 struct tp_job_t { void (*work) (void*); void* arg; }; struct tp_threadpool_t { pthread_t* threads; size_t nthreads; std::queue<tp_job_t> jobs; sem_t njobs; pthread_mutex_t lock; bool running; }; tp_threadpool_t* tp_init(size_t _nthreads); int tp_deinit(tp_threadpool_t* _ptp); void* tp_worker(void* _ptp); int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg); #endif
2. threadpool.cpp
/* * Linux线程池的简单实现. * Author: 赵子清 * Blog: http://www.cnblogs.com/zzqcn * **/ #include "threadpool.h" tp_threadpool_t* tp_init(size_t _nthreads) { if(_nthreads < 1 || _nthreads > DLPTP_MAX_THREADS) return NULL; int err = 0; tp_threadpool_t* ret = NULL; size_t i, j; ret = new tp_threadpool_t; if(NULL == ret) return NULL; ret->nthreads = _nthreads; ret->threads = new pthread_t[_nthreads]; if(NULL == ret->threads) { delete ret; return NULL; } ret->running = true; err = sem_init(&ret->njobs, 0, 0); if(-1 == err) { delete[] ret->threads; delete ret; return NULL; } err = pthread_mutex_init(&ret->lock, NULL); if(err) { sem_destroy(&ret->njobs); delete[] ret->threads; delete ret; return NULL; } for(i=0; i<_nthreads; ++i) { err = pthread_create(&ret->threads[i], NULL, tp_worker, (void*)ret); if(err) { ret->running = false; for(j=0; j<i; ++j) { pthread_cancel(ret->threads[j]); pthread_join(ret->threads[j], NULL); } pthread_mutex_destroy(&ret->lock); sem_destroy(&ret->njobs); delete[] ret->threads; delete ret; return NULL; } } return ret; } int tp_deinit(tp_threadpool_t* _ptp) { if(NULL == _ptp) return -1; int err = 0; size_t i, j; // TODO: if now worker has job to handle, do something then exit while(!_ptp->jobs.empty()); _ptp->running = false; for(i=0; i<_ptp->nthreads; ++i) { err = sem_post(&_ptp->njobs); /* V, ++ */ if(err) { for(j=i; j<_ptp->nthreads; ++j) pthread_cancel(_ptp->threads[j]); break; } } for(i=0; i<_ptp->nthreads; ++i) pthread_join(_ptp->threads[i], NULL); pthread_mutex_destroy(&_ptp->lock); sem_destroy(&_ptp->njobs); delete[] _ptp->threads; _ptp->threads = NULL; delete _ptp; _ptp = NULL; return 0; } void* tp_worker(void* _ptp) { if(NULL == _ptp) return NULL; tp_threadpool_t* p = (tp_threadpool_t*)_ptp; while(p->running) { sem_wait(&p->njobs); /* P, -- */ if(!p->running) return NULL; void (*work) (void*); void* arg; tp_job_t job; pthread_mutex_lock(&p->lock); /* LOCK */ job = p->jobs.front(); work = job.work; arg = job.arg; p->jobs.pop(); pthread_mutex_unlock(&p->lock); /* UNLOCK */ work(arg); } return NULL; } int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg) { if(NULL == _ptp || NULL == _work) return -1; tp_job_t job; job.work = _work; job.arg = _arg; pthread_mutex_lock(&_ptp->lock); /* LOCK */ _ptp->jobs.push(job); sem_post(&_ptp->njobs); /* V, ++ */ pthread_mutex_unlock(&_ptp->lock); /* UNLOCK */ return 0; }
3. 测试程序main.cpp
/* * Linux线程池测试. * Author: 赵子清 * Blog: http://www.cnblogs.com/zzqcn * **/ #include <unistd.h> #include <stdio.h> #include "threadpool.h" /* task 1 */ void task1(void* _arg) { printf("# Thread working: %u\n", (int)pthread_self()); printf(" Task 1 running..\n"); usleep(5000); } /* task 2 */ void task2(void* _arg) { printf("# Thread working: %u\n", (int)pthread_self()); printf(" Task 2 running.. "); printf("%d\n", *((int*)_arg)); usleep(5000); } #define N_THREADS 4 int main(int argc, char** argv) { tp_threadpool_t* ptp = NULL; int i; ptp = tp_init(N_THREADS); if(NULL == ptp) { fprintf(stderr, "tp_init fail\n"); return -1; } int a = 32; for(i=0; i<10; ++i) { tp_add_job(ptp, task1, NULL); tp_add_job(ptp, task2, (void*)&a); } tp_deinit(ptp); return 0; }
相关文章推荐
- 一个Linux下C线程池的实现
- 【转】【Linux】生产者消费者编程实现-线程池+信号量 - 江南烟雨 - 博客频道 - CSDN.NET
- Linux 下C语言简单实现线程池
- Linux下简单的多线程编程--线程池的实现
- Linux多线程实践(9) --简单线程池的设计与实现
- linux c 线程池(互斥变量+条件变量+模块化实现)(V1.0)
- Linux--线程池与进程池及线程池的简单实现
- 一个Linux下C线程池的实现
- Linux 下C语言简单实现线程池
- 一个Linux下C线程池的实现(转)
- Linux 下C语言简单实现线程池
- Linux下简单线程池的实现
- Linux下线程池的理解与简单实现
- 利用Linux系统函数实现线程池(C++)
- 一个Linux下C线程池的实现
- Linux下C实现线程池
- Linux--线程池与进程池及线程池的简单实现
- 一个Linux下C线程池的实现(转)
- 非常精简的Linux线程池实现(二)——使用信号量和自旋锁进一步简化程序
- linux下C语言简单实现线程池