Linux下调用pthread库实现简单线程池
2014-03-10 23:00
363 查看
[c-sharp] view
plaincopy
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>
void * routine(void * arg);
int pool_add_job(void *(*process)(void * arg),void *arg);
int pool_init(unsigned int thread_num);
int pool_destroy(void);
void *test(void *);
/* We define a queue of jobs which will be processed in thread pool*/
typedef struct job{
void * (*process)(void *arg); /* process() will employed on job*/
void *arg;/* argument to process */
struct job * next;
}Job;
/*
* A threadpool must have the following parts:
* threads: a list of threads
* jobs : jobs in thread pool
* pool_lock: pthread_mutex_lock for thread pool
* job_ready: pthread_cond_t for job ready
* destroy: flag indicate whether the pool shall be destroyed
* size: current size of jobs list
* thread_num: max thread num initialized in pool
*/
typedef struct thread_pool{
pthread_mutex_t pool_lock;
pthread_cond_t job_ready;
Job * jobs;
int destroy;
pthread_t * threads;
unsigned int thread_num;
int size;
}Thread_pool;
/* global Thread_pool variable*/
static Thread_pool * pool=NULL;
/*Initialize the thread pool*/
int pool_init(unsigned int thread_num)
{
pool=(Thread_pool *)malloc(sizeof(Thread_pool));
if(NULL==pool)
return -1;
pthread_mutex_init(&(pool->pool_lock),NULL);
pthread_cond_init(&(pool->job_ready),NULL);
pool->jobs=NULL;
pool->thread_num=thread_num;
pool->size=0;
pool->destroy=0;
pool->threads=(pthread_t *)malloc(thread_num * sizeof(pthread_t));
int i;
for(i=0;i<thread_num;i++){
pthread_create(&(pool->threads[i]),NULL,routine,NULL);
}
return 0;
}
/*
* Add job into the pool
* assign it to some thread
*/
int pool_add_job(void *(*process)(void *),void *arg)
{
Job * newjob=(Job *)malloc(sizeof(Job));
newjob->process=process;
newjob->arg=arg;
newjob->next=NULL;
pthread_mutex_lock(&(pool->pool_lock));
Job * temp=pool->jobs;
if(temp!=NULL){
while(temp->next)
temp=temp->next;
temp->next=newjob;
}
else{
pool->jobs=newjob;
}
/*For debug*/
assert(pool->jobs!=NULL);
pool->size++;
pthread_mutex_unlock(&(pool->pool_lock));
/*Rouse a thread to process this new job*/
pthread_cond_signal(&(pool->job_ready));
return 0;
}
/*Destroy the thread pool*/
int pool_destroy(void)
{
if(pool->destroy)/*Alread destroyed!*/
return -1;
int i;
pool->destroy=1;
pthread_cond_broadcast(&(pool->job_ready));/*notify all threads*/
for(i=0;i<pool->thread_num;i++)
pthread_join(pool->threads[i],NULL);
free(pool->threads);
Job * head=NULL;
while(pool->jobs){
head=pool->jobs;
pool->jobs=pool->jobs->next;
free(head);
}
pthread_mutex_destroy(&(pool->pool_lock));
pthread_cond_destroy(&(pool->job_ready));
free(pool);
pool=NULL;
return 0;
}
/*Every thread call this function*/
void * routine(void *arg)
{
printf("start thread %u/n",pthread_self());
while(1){
pthread_mutex_lock(&(pool->pool_lock));
while(pool->size==0 && !pool->destroy){
printf("thread %u is waiting/n",pthread_self());
pthread_cond_wait(&(pool->job_ready),&(pool->pool_lock));
}
if(pool->destroy){
pthread_mutex_unlock(&(pool->pool_lock));
printf("thread %u will exit/n",pthread_self());
pthread_exit(NULL);
}
printf("thread %u is starting to work/n",pthread_self());
/*For debug*/
assert(pool->size!=0);
assert(pool->jobs!=NULL);
pool->size--;
Job * job=pool->jobs;
pool->jobs=job->next;
pthread_mutex_unlock(&(pool->pool_lock));
/*process job*/
(*(job->process))(job->arg);
free(job);
job=NULL;
}
/*You should never get here*/
pthread_exit(NULL);
}
void *test(void *arg)
{
printf("thread %u is working on job %u/n",pthread_self(),*(int *)arg);
sleep(1);
return NULL;
}
int main(int argc,char **argv)
{
int i;
int *jobs=(int *)malloc(sizeof(int)*10);
pool_init(3);
for(i=0;i<10;i++){
jobs[i]=i;
pool_add_job(test,&jobs[i]);
}
sleep(5);
pool_destroy();
free(jobs);
return EXIT_SUCCESS;
}
plaincopy
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>
void * routine(void * arg);
int pool_add_job(void *(*process)(void * arg),void *arg);
int pool_init(unsigned int thread_num);
int pool_destroy(void);
void *test(void *);
/* We define a queue of jobs which will be processed in thread pool*/
typedef struct job{
void * (*process)(void *arg); /* process() will employed on job*/
void *arg;/* argument to process */
struct job * next;
}Job;
/*
* A threadpool must have the following parts:
* threads: a list of threads
* jobs : jobs in thread pool
* pool_lock: pthread_mutex_lock for thread pool
* job_ready: pthread_cond_t for job ready
* destroy: flag indicate whether the pool shall be destroyed
* size: current size of jobs list
* thread_num: max thread num initialized in pool
*/
typedef struct thread_pool{
pthread_mutex_t pool_lock;
pthread_cond_t job_ready;
Job * jobs;
int destroy;
pthread_t * threads;
unsigned int thread_num;
int size;
}Thread_pool;
/* global Thread_pool variable*/
static Thread_pool * pool=NULL;
/*Initialize the thread pool*/
int pool_init(unsigned int thread_num)
{
pool=(Thread_pool *)malloc(sizeof(Thread_pool));
if(NULL==pool)
return -1;
pthread_mutex_init(&(pool->pool_lock),NULL);
pthread_cond_init(&(pool->job_ready),NULL);
pool->jobs=NULL;
pool->thread_num=thread_num;
pool->size=0;
pool->destroy=0;
pool->threads=(pthread_t *)malloc(thread_num * sizeof(pthread_t));
int i;
for(i=0;i<thread_num;i++){
pthread_create(&(pool->threads[i]),NULL,routine,NULL);
}
return 0;
}
/*
* Add job into the pool
* assign it to some thread
*/
int pool_add_job(void *(*process)(void *),void *arg)
{
Job * newjob=(Job *)malloc(sizeof(Job));
newjob->process=process;
newjob->arg=arg;
newjob->next=NULL;
pthread_mutex_lock(&(pool->pool_lock));
Job * temp=pool->jobs;
if(temp!=NULL){
while(temp->next)
temp=temp->next;
temp->next=newjob;
}
else{
pool->jobs=newjob;
}
/*For debug*/
assert(pool->jobs!=NULL);
pool->size++;
pthread_mutex_unlock(&(pool->pool_lock));
/*Rouse a thread to process this new job*/
pthread_cond_signal(&(pool->job_ready));
return 0;
}
/*Destroy the thread pool*/
int pool_destroy(void)
{
if(pool->destroy)/*Alread destroyed!*/
return -1;
int i;
pool->destroy=1;
pthread_cond_broadcast(&(pool->job_ready));/*notify all threads*/
for(i=0;i<pool->thread_num;i++)
pthread_join(pool->threads[i],NULL);
free(pool->threads);
Job * head=NULL;
while(pool->jobs){
head=pool->jobs;
pool->jobs=pool->jobs->next;
free(head);
}
pthread_mutex_destroy(&(pool->pool_lock));
pthread_cond_destroy(&(pool->job_ready));
free(pool);
pool=NULL;
return 0;
}
/*Every thread call this function*/
void * routine(void *arg)
{
printf("start thread %u/n",pthread_self());
while(1){
pthread_mutex_lock(&(pool->pool_lock));
while(pool->size==0 && !pool->destroy){
printf("thread %u is waiting/n",pthread_self());
pthread_cond_wait(&(pool->job_ready),&(pool->pool_lock));
}
if(pool->destroy){
pthread_mutex_unlock(&(pool->pool_lock));
printf("thread %u will exit/n",pthread_self());
pthread_exit(NULL);
}
printf("thread %u is starting to work/n",pthread_self());
/*For debug*/
assert(pool->size!=0);
assert(pool->jobs!=NULL);
pool->size--;
Job * job=pool->jobs;
pool->jobs=job->next;
pthread_mutex_unlock(&(pool->pool_lock));
/*process job*/
(*(job->process))(job->arg);
free(job);
job=NULL;
}
/*You should never get here*/
pthread_exit(NULL);
}
void *test(void *arg)
{
printf("thread %u is working on job %u/n",pthread_self(),*(int *)arg);
sleep(1);
return NULL;
}
int main(int argc,char **argv)
{
int i;
int *jobs=(int *)malloc(sizeof(int)*10);
pool_init(3);
for(i=0;i<10;i++){
jobs[i]=i;
pool_add_job(test,&jobs[i]);
}
sleep(5);
pool_destroy();
free(jobs);
return EXIT_SUCCESS;
}
相关文章推荐
- Linux C++ 一个线程池的简单实现(附代码)
- 【收集】Linux线程池(C语言)及简单实现示例
- Linux:内核模块实现替换系统调用的简单例子
- Linux--线程池与进程池及线程池的简单实现
- Linux 下C语言简单实现线程池
- linux 下c++线程池的简单实现(在老外代码上添加注释)
- Linux下简单线程池的实现
- Linux下线程池的理解与简单实现
- Linux 下C语言简单实现线程池
- 【转载】实现一个简单的linux线程池
- 【源码剖析】threadpool —— 基于 pthread 实现的简单线程池
- linux下c/c++实例之十三C实现的简单的线程池
- Linux:内核模块实现替换系统调用的简单例子
- 【源码剖析】threadpool —— 基于 pthread 实现的简单线程池
- linux下用c语言实现一个简单的线程池
- Linux 下C语言简单实现线程池
- LINUX C++ 线程池简单实现之双队列
- 如何在Linux下实现你的线程池(Step By Step,Pthread)
- Linux 下C语言简单实现线程池
- Linux--线程池与进程池及线程池的简单实现