您的位置:首页 > 编程语言 > C语言/C++

C++线程池的简单实现方法

2014-09-05 11:01 841 查看

本文以实例形式较为详细的讲述了C++线程池的简单实现方法。分享给大家供大家参考之用。具体方法如下:

一、几个基本的线程函数:

1.线程操纵函数:

int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg); //创建
void pthread_exit(void *retval);            //终止自身
int pthread_cancel(pthread_t tid);            //终止其他.发送终止信号后目标线程不一定终止,要调用join函数等待
int pthread_join(pthread_t tid, void **retval);   //阻塞并等待其他线程

2.属性:

int pthread_attr_init(pthread_attr_t *attr);           //初始化属性
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate); //设置分离状态
int pthread_attr_destroy(pthread_attr_t *attr);           //销毁属性

 

3.同步函数
互斥锁

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); //初始化锁
int pthread_mutex_destroy(pthread_mutex_t *mutex); //销毁锁
int pthread_mutex_lock(pthread_mutex_t *mutex); //加锁
int pthread_mutex_trylock(pthread_mutex_t *mutex); //尝试加锁,上面lock的非阻塞版本
int pthread_mutex_unlock(pthread_mutex_t *mutex); //解锁

4.条件变量

int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr); //初始化
int pthread_cond_destroy(pthread_cond_t *cond);                 //销毁
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);     //等待条件
int pthread_cond_signal(pthread_cond_t *cond);                 //通知,唤醒第一个调用pthread_cond_wait()而进入睡眠的线程

5.工具函数

int pthread_equal(pthread_t t1, pthread_t t2); //比较线程ID
int pthread_detach(pthread_t tid);       //分离线程
pthread_t pthread_self(void);            //自身ID

上述代码中,线程的cancel和join,以及最后的工具函数,这些函数的参数都为结构体变量,其他的函数参数都是结构体变量指针;品味一下,参数为指针的,因为都需要改变结构体的内容,而参数为普通变量的,则只需要读内容即可。

二、线程池代码:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>  //linux环境中多线程的头文件,非C语言标准库,编译时最后要加 -lpthread 调用动态链接库
//工作链表的结构
typedef struct worker {
void *(*process)(void *arg);  //工作函数
void *arg;           //函数的参数
struct worker *next;
}CThread_worker;
//线程池的结构
typedef struct {
pthread_mutex_t queue_lock;   //互斥锁
pthread_cond_t queue_ready;  //条件变量/信号量
CThread_worker *queue_head;   //指向工作链表的头结点,临界区
int cur_queue_size;       //记录链表中工作的数量,临界区
int max_thread_num;       //最大线程数
pthread_t *threadid;      //线程ID
int shutdown;          //开关
}CThread_pool;
static CThread_pool *pool = NULL;  //一个线程池变量
int pool_add_worker(void *(*process)(void *arg), void *arg);  //负责向工作链表中添加工作
void *thread_routine(void *arg);  //线程例程
//线程池初始化
void pool_init(int max_thread_num)
{
int i = 0;
pool = (CThread_pool *) malloc (sizeof(CThread_pool));  //创建线程池
pthread_mutex_init(&(pool->queue_lock), NULL);   //互斥锁初始化,参数为锁的地址
pthread_cond_init( &(pool->queue_ready), NULL);   //条件变量初始化,参数为变量地址
pool->queue_head = NULL;
pool->cur_queue_size = 0;
pool->max_thread_num = max_thread_num;
pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t));
for (i = 0; i < max_thread_num; i++) {
pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL); //创建线程, 参数为线程ID变量地址、属性、例程、参数
}
pool->shutdown = 0;
}
//例程,调用具体的工作函数
void *thread_routine(void *arg)
{
printf("starting thread 0x%x\n", (int)pthread_self());
while(1) {
pthread_mutex_lock(&(pool->queue_lock));  //从工作链表中取工作,要先加互斥锁,参数为锁地址
while(pool->cur_queue_size == 0 && !pool->shutdown) {    //链表为空
printf("thread 0x%x is waiting\n", (int)pthread_self());
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));  //等待资源,信号量用于通知。会释放第二个参数的锁,以供添加;函数返回时重新加锁。
}
if(pool->shutdown) {
pthread_mutex_unlock(&(pool->queue_lock));     //结束开关开启,释放锁并退出线程
printf("thread 0x%x will exit\n", (int)pthread_self());
pthread_exit(NULL);   //参数为void *
}
printf("thread 0x%x is starting to work\n", (int)pthread_self());
--pool->cur_queue_size;
CThread_worker *worker = pool->queue_head;
pool->queue_head = worker->next;
pthread_mutex_unlock (&(pool->queue_lock));   //获取一个工作后释放锁
(*(worker->process))(worker->arg);   //做工作
free(worker);
worker = NULL;
}
pthread_exit(NULL);
}
//销毁线程池
int pool_destroy()
{
if(pool->shutdown)   //检测结束开关是否开启,若开启,则所有线程会自动退出
return -1;
pool->shutdown = 1;
pthread_cond_broadcast( &(pool->queue_ready) );   //广播,唤醒所有线程,准备退出
int i;
for(i = 0; i < pool->max_thread_num; ++i)
pthread_join(pool->threadid[i], NULL);   //主线程等待所有线程退出,只有join第一个参数不是指针,第二个参数类型是void **,接收exit的返回值,需要强制转换
free(pool->threadid);
CThread_worker *head = NULL;
while(pool->queue_head != NULL) {      //释放未执行的工作链表剩余结点
head = pool->queue_head;
pool->queue_head = pool->queue_head->next;
free(head);
}
pthread_mutex_destroy(&(pool->queue_lock));   //销毁锁和条件变量
pthread_cond_destroy(&(pool->queue_ready));
free(pool);
pool=NULL;
return 0;
}
void *myprocess(void *arg)
{
printf("threadid is 0x%x, working on task %d\n", (int)pthread_self(), *(int*)arg);
sleep (1);
return NULL;
}
//添加工作
int pool_add_worker(void *(*process)(void *arg), void *arg)
{
CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker));
newworker->process = process;  //具体的工作函数
newworker->arg = arg;
newworker->next = NULL;
pthread_mutex_lock( &(pool->queue_lock) );   //加锁
CThread_worker *member = pool->queue_head;   //插入链表尾部
if( member != NULL ) {
while( member->next != NULL )
member = member->next;
member->next = newworker;
}
else {
pool->queue_head = newworker;
}
++pool->cur_queue_size;
pthread_mutex_unlock( &(pool->queue_lock) );  //解锁
pthread_cond_signal( &(pool->queue_ready) );  //通知一个等待的线程
return 0;
}
int main(int argc, char **argv)
{
pool_init(3);  //主线程创建线程池,3个线程
int *workingnum = (int *) malloc(sizeof(int) * 10);
int i;
for(i = 0; i < 10; ++i) {
workingnum[i] = i;
pool_add_worker(myprocess, &workingnum[i]);   //主线程负责添加工作,10个工作
}
sleep (5);
pool_destroy();   //销毁线程池
free (workingnum);
return 0;
}

希望本文所述对大家的C++程序设计有所帮助。

您可能感兴趣的文章:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  C++ 线程池