您的位置:首页 > 运维架构 > Linux

线程池(Linux实现)

2014-03-26 17:43 357 查看
讨论QQ群:135202158

本文技术参考了sourceforge项目c thread pool,链接:http://sourceforge.net/projects/cthpool/

线程池如上一篇随笔(/article/5928096.html)提到的内存池一样,也是一种池化策略,在启动时(或者更高级的,运行时按一定策略分配)预先开启N个线程,当没有工作要做时,这些线程处于睡眠中;一旦有工作加入工作队列,其中的某些线程就会醒来,处理这些工作,完成后继续睡眠 。

要实现线程池(只针对本文的简单实现而言),应设计和构建3样东西:

含N个线程的线程组

工作队列

工作线程例程

线程组和工作队列表示如下:

/*
*     Threads:
*
*     +----------+----------+------+------------+
*     | thread 0 | thread 1 | .... | thread n-1 |
*     +----------+----------+------+------------+
*
*     Job Queue:
*
*        back                            front
*         |                                |
*         v                                v
*     +-------+    +-------+           +-------+
*     | job 0 | -> | job 1 | -> ... -> | job x |
*     +-------+    +-------+           +-------+
*
*/


线程组可以用普通数组或者动态分配的数组实现,维数就是池中线程数量,存放的其实是线程ID。工作队列可以直接用C++ queue容器实现。

工作线程例程(线程函数)的大致执行流程如下图所示:

/*
*
*     Each Thread Routine:
*                                 Job-Queue
*              |                    ...
*              v                     |
*          +-------+            +---------+   EnQueue
*    +---> | sleep |  (No job)  | new job | <--------- Client
*    |     +-------+            +---------+
*    |         |                     |
*    |         |     DeQueue    +---------+
*    |         +  <-----------  | new job |
*    |         |                +---------+
*    |         v
*    |    +---------+
*    |    | do work |
*    |    +---------+
*    |         |
*    |         |
*    +----<----+
*
*/


工作队列中没有工作时它就睡眠 ,有工作时苏醒,从队列首部取出(&删除)一个工作,然后开始执行。

另外,我们还需要一个互斥锁L和一个计数信号量S,互斥锁用来同步工作队列的增删操作,计数信号量用来对工作队列中的工作数量进行记录。工作线程会一直等待S,直到它大于0。

下面给出完整代码。

1. threadpool.h

/*
* Linux线程池的简单实现.
* Author: 赵子清
* Blog: http://www.cnblogs.com/zzqcn *
**/

#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__

#include <semaphore.h>
#include <pthread.h>
#include <queue>

#define  DLPTP_MAX_THREADS    1024

struct tp_job_t
{
void        (*work) (void*);
void*        arg;
};

struct tp_threadpool_t
{
pthread_t*            threads;
size_t                nthreads;
std::queue<tp_job_t>    jobs;
sem_t                njobs;
pthread_mutex_t        lock;
bool                running;
};

tp_threadpool_t*  tp_init(size_t _nthreads);
int     tp_deinit(tp_threadpool_t* _ptp);
void*   tp_worker(void* _ptp);
int     tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg);

#endif


2. threadpool.cpp

/*
* Linux线程池的简单实现.
* Author: 赵子清
* Blog: http://www.cnblogs.com/zzqcn *
**/

#include "threadpool.h"

tp_threadpool_t*  tp_init(size_t _nthreads)
{
if(_nthreads < 1 || _nthreads > DLPTP_MAX_THREADS)
return  NULL;

int  err = 0;
tp_threadpool_t*  ret = NULL;
size_t  i, j;

ret = new tp_threadpool_t;
if(NULL == ret)
return  NULL;
ret->nthreads = _nthreads;
ret->threads = new pthread_t[_nthreads];
if(NULL == ret->threads)
{
delete ret;
return  NULL;
}
ret->running = true;

err = sem_init(&ret->njobs, 0, 0);
if(-1 == err)
{
delete[] ret->threads;
delete ret;
return  NULL;
}

err = pthread_mutex_init(&ret->lock, NULL);
if(err)
{
sem_destroy(&ret->njobs);
delete[] ret->threads;
delete ret;
return  NULL;
}

for(i=0; i<_nthreads; ++i)
{
err = pthread_create(&ret->threads[i], NULL, tp_worker, (void*)ret);
if(err)
{
ret->running = false;
for(j=0; j<i; ++j)
{
pthread_cancel(ret->threads[j]);
pthread_join(ret->threads[j], NULL);
}
pthread_mutex_destroy(&ret->lock);
sem_destroy(&ret->njobs);
delete[] ret->threads;
delete ret;
return  NULL;
}
}

return ret;
}

int  tp_deinit(tp_threadpool_t* _ptp)
{
if(NULL == _ptp)
return  -1;

int  err = 0;
size_t  i, j;

// TODO: if now worker has job to handle, do something then exit
while(!_ptp->jobs.empty());

_ptp->running = false;

for(i=0; i<_ptp->nthreads; ++i)
{
err = sem_post(&_ptp->njobs);              /* V, ++ */
if(err)
{
for(j=i; j<_ptp->nthreads; ++j)
pthread_cancel(_ptp->threads[j]);
break;
}
}

for(i=0; i<_ptp->nthreads; ++i)
pthread_join(_ptp->threads[i], NULL);

pthread_mutex_destroy(&_ptp->lock);
sem_destroy(&_ptp->njobs);

delete[] _ptp->threads; _ptp->threads = NULL;
delete _ptp;            _ptp = NULL;

return  0;
}

void*  tp_worker(void* _ptp)
{
if(NULL == _ptp)
return  NULL;

tp_threadpool_t* p = (tp_threadpool_t*)_ptp;

while(p->running)
{
sem_wait(&p->njobs);                /* P, -- */

if(!p->running)
return  NULL;

void   (*work) (void*);
void*  arg;
tp_job_t  job;

pthread_mutex_lock(&p->lock);       /* LOCK */

job = p->jobs.front();
work = job.work;
arg = job.arg;
p->jobs.pop();

pthread_mutex_unlock(&p->lock);     /* UNLOCK */

work(arg);
}

return  NULL;
}

int  tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg)
{
if(NULL == _ptp || NULL == _work)
return  -1;

tp_job_t  job;
job.work = _work;
job.arg = _arg;

pthread_mutex_lock(&_ptp->lock);        /* LOCK */
_ptp->jobs.push(job);
sem_post(&_ptp->njobs);                 /* V, ++ */
pthread_mutex_unlock(&_ptp->lock);      /* UNLOCK */

return  0;
}


3. 测试程序main.cpp

/*
* Linux线程池测试.
* Author: 赵子清
* Blog: http://www.cnblogs.com/zzqcn *
**/

#include <unistd.h>
#include <stdio.h>
#include "threadpool.h"

/* task 1 */
void task1(void* _arg)
{
printf("# Thread working: %u\n", (int)pthread_self());
printf("  Task 1 running..\n");
usleep(5000);
}

/* task 2 */
void task2(void* _arg)
{
printf("# Thread working: %u\n", (int)pthread_self());
printf("  Task 2 running..  ");
printf("%d\n", *((int*)_arg));
usleep(5000);
}

#define  N_THREADS  4

int  main(int argc, char** argv)
{
tp_threadpool_t*  ptp = NULL;
int  i;

ptp = tp_init(N_THREADS);
if(NULL == ptp)
{
fprintf(stderr, "tp_init fail\n");
return -1;
}

int  a = 32;
for(i=0; i<10; ++i)
{
tp_add_job(ptp, task1, NULL);
tp_add_job(ptp, task2, (void*)&a);
}

tp_deinit(ptp);

return  0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: