您的位置:首页 > 运维架构 > Linux

linux下简单线程池实现

2012-10-25 15:34 886 查看
/* 头文件:threadpool.h*/

#ifndef _TP_H_INCLUDED_

#define _TP_H_INCLUDED_

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <sys/types.h>

#include <pthread.h>

#include <assert.h>

typedef struct worker {

  void * (*process) (void *arg);

  void *arg;

  struct worker *next;

} threadpool_item_t;

typedef struct {

   pthread_mutex_t queue_lock;

   pthread_cond_t queue_ready;

   threadpool_item_t *queue_head;

  int shutdown; /*是否销毁线程池*/

  pthread_t *threadid;

int max_thread_num;/*线程池中允许的活动线程数目*/

   int cur_queue_size; /*当前等待队列的任务数目*/

} threadpool_t;

void pool_init (int max_thread_num);

int pool_destroy ();

int pool_add_worker (void * (*process) (void *arg), void *arg);

static void * thread_routine (void *arg);

#endif

/*具体实现:threadpool.c*/

#include "threadpool.h"

static threadpool_t *pool = NULL;

/*初始化线程池*/

void pool_init (int max_thread_num) {

pool = (threadpool_t *) malloc (sizeof (threadpool_t));

  pthread_mutex_init (&(pool->queue_lock), NULL);

  pthread_cond_init (&(pool->queue_ready), NULL);

  pool->queue_head = NULL;

  pool->max_thread_num = max_thread_num;

  pool->cur_queue_size = 0;

  pool->shutdown = 0;

  pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));

  int i;

  for (i = 0; i < max_thread_num; i++) {

    pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL);

}

}

/*向线程池中加入任务*/

int pool_add_worker (void *(*process) (void *arg), void *arg) {

   /*构造一个新任务*/

  threadpool_item_t *newworker = (threadpool_item_t *) malloc (sizeof (threadpool_item_t));

   newworker->process = process;

  newworker->arg = arg;

   newworker->next = NULL;

  pthread_mutex_lock (&(pool->queue_lock));

/*将任务加入到等待队列中*/

  threadpool_item_t *member = pool->queue_head;

 if (member != NULL) {

  while (member->next != NULL) member = member->next;

member->next = newworker;

 } else{

    pool->queue_head = newworker;

}

 assert (pool->queue_head != NULL);

 pool->cur_queue_size++;

  pthread_mutex_unlock (&(pool->queue_lock));

/*唤醒一个等待线程*/

pthread_cond_signal (&(pool->queue_ready));

return 0;

}

/*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出*/

int pool_destroy () {

if (pool->shutdown) return -1;/*防止两次调用*/

pool->shutdown = 1;

/*唤醒所有等待线程,线程池要销毁了*/

pthread_cond_broadcast (&(pool->queue_ready));

/*阻塞等待线程退出,否则就成僵尸了*/

int i;

for (i = 0; i < pool->max_thread_num; i++) {

  pthread_join (pool->threadid[i], NULL);

}

free (pool->threadid);

/*销毁等待队列*/

threadpool_item_t *head = NULL;

while (pool->queue_head != NULL)

{

  head = pool->queue_head;

  pool->queue_head = pool->queue_head->next;

 free (head);

}

/*销毁条件变量和互斥量*/

pthread_mutex_destroy(&(pool->queue_lock));

pthread_cond_destroy(&(pool->queue_ready));

free (pool);

pool=NULL;

return 0;

}

/*线程主函数*/

static void * thread_routine (void *arg) {

printf ("starting thread %lu\n", pthread_self ());

while (1) {

  pthread_mutex_lock (&(pool->queue_lock));

  /*如果等待队列为0并且不销毁线程池,则处于阻塞状态;*/

   while (pool->cur_queue_size == 0 && !pool->shutdown)

  {

    printf ("thread %lu is waiting\n", pthread_self ());

    pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));

   }

/*线程池要销毁了*/

if (pool->shutdown) {

    /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/

     pthread_mutex_unlock (&(pool->queue_lock));

    printf ("thread %lu will exit\n", pthread_self ());

    pthread_exit (NULL);

}

printf ("thread %lu is starting to work\n", pthread_self ());

assert (pool->cur_queue_size != 0);

assert (pool->queue_head != NULL);

/*队列长度减去1,并取出链表中的头元素*/

  pool->cur_queue_size--;

 threadpool_item_t *worker = pool->queue_head;

pool->queue_head = worker->next;

pthread_mutex_unlock (&(pool->queue_lock)); //解锁

/*调用回调函数,执行任务*/

(*(worker->process)) (worker->arg);

free (worker);

worker = NULL;

}

}

/* 下面是测试代码 */

void * myprocess (void *arg)

{

printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);

sleep (1);/*休息一秒,延长任务的执行时间*/

return NULL;

}

int main (int argc, char **argv)

{

pool_init (3);/*线程池中最多三个活动线程*/

/*连续向池中投入10个任务*/

int *workingnum = (int *) malloc (sizeof (int) * 10);

int i;

for (i = 0; i < 10; i++)

{

workingnum[i] = i;

pool_add_worker (myprocess, &workingnum[i]);

}

/*等待所有任务完成*/ sleep (5);

/*销毁线程池*/ pool_destroy ();

free(workingnum);

workingnum = NULL;

return 0;

}

编译,运行:

$ gcc -std=c99 -o threadpool threadpool.h threadpool.c -lpthread

$ ./threadpool
starting thread 1115277632
thread 1115277632 is starting to work
threadid is 1115277632, working on task 0
starting thread 1125767488
thread 1125767488 is starting to work
threadid is 1125767488, working on task 1
starting thread 1136257344
thread 1136257344 is starting to work
threadid is 1136257344, working on task 2
thread 1115277632 is starting to work
threadid is 1115277632, working on task 3
thread 1125767488 is starting to work
threadid is 1125767488, working on task 4
thread 1136257344 is starting to work
threadid is 1136257344, working on task 5
thread 1115277632 is starting to work
threadid is 1115277632, working on task 6
thread 1125767488 is starting to work
threadid is 1125767488, working on task 7
thread 1136257344 is starting to work
threadid is 1136257344, working on task 8
thread 1115277632 is starting to work
threadid is 1115277632, working on task 9
thread 1125767488 is waiting
thread 1136257344 is waiting
thread 1115277632 is waiting
thread 1125767488 will exit
thread 1136257344 will exit
thread 1115277632 will exit
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: