您的位置:首页 > 运维架构 > Linux

linux下C实现线程池的源代码

2008-05-18 13:46 267 查看
ThreadPool
ThreadPool头文件:




/**//**


 * threadpool.h


 *


 * This file declares the functionality associated with


 * your implementation of a threadpool.


 * 线程池的实现


 */




#ifndef __threadpool_h__


#define __threadpool_h__




#ifdef __cplusplus




extern "C" ...{


#endif




// maximum number of threads allowed in a pool


//最大线程数在池中


#define MAXT_IN_POOL 200




// You must hide the internal details of the threadpool


// structure from callers, thus declare threadpool of type "void".


// In threadpool.c, you will use type conversion to coerce


// variables of type "threadpool" back and forth to a


// richer, internal type.  (See threadpool.c for details.)


//为了向使用者隐藏线程池的内部结构细节,将threadpool声明为void*


//在threadpool.c中可以用类型转换转换回来,细节请看threadpool.c




typedef void *threadpool;




// "dispatch_fn" declares a typed function pointer.  A


// variable of type "dispatch_fn" points to a function


// with the following signature:


// 


//     void dispatch_function(void *arg);


//dispatch_fn定义函数指针




typedef void (*dispatch_fn)(void *);






/**//**


 * create_threadpool creates a fixed-sized thread


 * pool.  If the function succeeds, it returns a (non-NULL)


 * "threadpool", else it returns NULL.


 */


//创建固定大小的线程池,如果创建成功,函数返回非空值,否则返回空值


threadpool create_threadpool(int num_threads_in_pool);








/**//**


 * dispatch sends a thread off to do some work.  If


 * all threads in the pool are busy, dispatch will


 * block until a thread becomes free and is dispatched.


 * 


 * Once a thread is dispatched, this function returns


 * immediately.


 * 


 * The dispatched thread calls into the function


 * "dispatch_to_here" with argument "arg".


 */


//分配一个线程完成请求,如果池中所有线程都非空闲,调度程序阻塞直到有线程空闲并马上调度


//一旦一个线程被调度,函数马上返回


//这个线程调用函数dispathch_to_here,arg作为函数参数


int dispatch_threadpool(threadpool from_me, dispatch_fn dispatch_to_here,


          void *arg);






/**//**


 * destroy_threadpool kills the threadpool, causing


 * all threads in it to commit suicide, and then


 * frees all the memory associated with the threadpool.


 */


//销毁线程池,使池中所有线程自杀,之后释放所有相关内存


void destroy_threadpool(threadpool destroyme);




#ifdef __cplusplus


}


#endif




#endif



线程池实现代码文件:




/**//**


 * threadpool.c


 *


 * This file will contain your implementation of a threadpool.


 * 此文件包含线路池的具体实现


 */




#include <stdio.h>


#include <stdlib.h>


#include <unistd.h>


#include <pthread.h>


#include <string.h>




#include "threadpool.h"






typedef struct _thread_st ...{


    pthread_t id;


    pthread_mutex_t mutex;


    pthread_cond_t cond;


    dispatch_fn fn;


    void *arg;


    threadpool parent;


} _thread;




// _threadpool is the internal threadpool structure that is


// cast to type "threadpool" before it given out to callers


// _threadpool是内部线程池结构,转换成类型“threadpool”在提交给使用者之前




typedef struct _threadpool_st ...{


    // you should fill in this structure with whatever you need


    pthread_mutex_t tp_mutex;


    pthread_cond_t tp_idle;


    pthread_cond_t tp_full;


    pthread_cond_t tp_empty;


    _thread ** tp_list;


    int tp_index;


    int tp_max_index;


    int tp_stop;




    int tp_total;


} _threadpool;




threadpool create_threadpool(int num_threads_in_pool)




...{


    _threadpool *pool;




    // sanity check the argument


        //参数检查


    if ((num_threads_in_pool <= 0) || (num_threads_in_pool > MAXT_IN_POOL))


        return NULL;




    pool = (_threadpool *) malloc(sizeof(_threadpool));




    if (pool == NULL) ...{


        fprintf(stderr, "Out of memory creating a new threadpool! ");


        return NULL;


    }




    // add your code here to initialize the newly created threadpool


    pthread_mutex_init( &pool->tp_mutex, NULL );


    pthread_cond_init( &pool->tp_idle, NULL );


    pthread_cond_init( &pool->tp_full, NULL );


    pthread_cond_init( &pool->tp_empty, NULL );


    pool->tp_max_index = num_threads_in_pool;


    pool->tp_index = 0;


    pool->tp_stop = 0;


    pool->tp_total = 0;


    pool->tp_list = ( _thread ** )malloc( sizeof( void * ) * MAXT_IN_POOL );


    memset( pool->tp_list, 0, sizeof( void * ) * MAXT_IN_POOL );




    return (threadpool) pool;


}




int save_thread( _threadpool * pool, _thread * thread )




...{


    int ret = -1;




    pthread_mutex_lock( &pool->tp_mutex );






    if( pool->tp_index < pool->tp_max_index ) ...{


        pool->tp_list[ pool->tp_index ] = thread;


        pool->tp_index++;


        ret = 0;




        pthread_cond_signal( &pool->tp_idle );






        if( pool->tp_index >= pool->tp_total ) ...{


            pthread_cond_signal( &pool->tp_full );


        }


    }




    pthread_mutex_unlock( &pool->tp_mutex );




    return ret;


}




void * wrapper_fn( void * arg )




...{


    _thread * thread = (_thread*)arg;


    _threadpool * pool = (_threadpool*)thread->parent;






    for( ; 0 == ((_threadpool*)thread->parent)->tp_stop; ) ...{


        thread->fn( thread->arg );




        pthread_mutex_lock( &thread->mutex );




        if( 0 == save_thread( thread->parent, thread ) ) ...{


            pthread_cond_wait( &thread->cond, &thread->mutex );


            pthread_mutex_unlock( &thread->mutex );




        } else ...{


            pthread_mutex_unlock( &thread->mutex );


            pthread_cond_destroy( &thread->cond );


            pthread_mutex_destroy( &thread->mutex );




            free( thread );


            break;


        }


    }




    pthread_mutex_lock( &pool->tp_mutex );


    pool->tp_total--;


    if( pool->tp_total <= 0 ) pthread_cond_signal( &pool->tp_empty );


    pthread_mutex_unlock( &pool->tp_mutex );




    return NULL;


}




int dispatch_threadpool(threadpool from_me, dispatch_fn dispatch_to_here, void *arg)




...{


    int ret = 0;




    _threadpool *pool = (_threadpool *) from_me;


    pthread_attr_t attr;


    _thread * thread = NULL;




    // add your code here to dispatch a thread


    pthread_mutex_lock( &pool->tp_mutex );






    if( pool->tp_index <= 0 && pool->tp_total >= pool->tp_max_index ) ...{


        pthread_cond_wait( &pool->tp_idle, &pool->tp_mutex );


    }






    if( pool->tp_index <= 0 ) ...{


        _thread * thread = ( _thread * )malloc( sizeof( _thread ) );


        thread->id = 0;


        pthread_mutex_init( &thread->mutex, NULL );


        pthread_cond_init( &thread->cond, NULL );


        thread->fn = dispatch_to_here;


        thread->arg = arg;


        thread->parent = pool;




        pthread_attr_init( &attr );


        pthread_attr_setdetachstate( &attr,PTHREAD_CREATE_DETACHED );






        if( 0 == pthread_create( &thread->id, &attr, wrapper_fn, thread ) ) ...{


            pool->tp_total++;


            printf( "create thread#%ld ", thread->id );




        } else ...{


            ret = -1;


            printf( "cannot create thread " );


            pthread_mutex_destroy( &thread->mutex );


            pthread_cond_destroy( &thread->cond );


            free( thread );


        }




    } else ...{


        pool->tp_index--;


        thread = pool->tp_list[ pool->tp_index ];


        pool->tp_list[ pool->tp_index ] = NULL;




        thread->fn = dispatch_to_here;


        thread->arg = arg;


        thread->parent = pool;




        pthread_mutex_lock( &thread->mutex );


        pthread_cond_signal( &thread->cond ) ;


        pthread_mutex_unlock ( &thread->mutex );


    }




    pthread_mutex_unlock( &pool->tp_mutex );




    return ret;


}




void destroy_threadpool(threadpool destroyme)




...{


    _threadpool *pool = (_threadpool *) destroyme;




    // add your code here to kill a threadpool


    int i = 0;




    pthread_mutex_lock( &pool->tp_mutex );






    if( pool->tp_index < pool->tp_total ) ...{


        printf( "waiting for %d thread(s) to finish ", pool->tp_total - pool->tp_index );


        pthread_cond_wait( &pool->tp_full, &pool->tp_mutex );


    }




    pool->tp_stop = 1;






    for( i = 0; i < pool->tp_index; i++ ) ...{


        _thread * thread = pool->tp_list[ i ];




        pthread_mutex_lock( &thread->mutex );


        pthread_cond_signal( &thread->cond ) ;


        pthread_mutex_unlock ( &thread->mutex );


    }






    if( pool->tp_total > 0 ) ...{


        printf( "waiting for %d thread(s) to exit ", pool->tp_total );


        pthread_cond_wait( &pool->tp_empty, &pool->tp_mutex );


    }






    for( i = 0; i < pool->tp_index; i++ ) ...{


        free( pool->tp_list[ i ] );


        pool->tp_list[ i ] = NULL;


    }




    pthread_mutex_unlock( &pool->tp_mutex );




    pool->tp_index = 0;




    pthread_mutex_destroy( &pool->tp_mutex );


    pthread_cond_destroy( &pool->tp_idle );


    pthread_cond_destroy( &pool->tp_full );


    pthread_cond_destroy( &pool->tp_empty );




    free( pool->tp_list );


    free( pool );


}



下面是一个线程池测试程序:




/**//**


 * threadpool_test.c, copyright 2001 Steve Gribble


 *


 * Just a regression test for the threadpool code.


 * 仅为以上线程池代码的一个测试


 */




#include <stdio.h>


#include <stdlib.h>


#include <unistd.h>


#include <pthread.h>


#include <errno.h>


#include <stdarg.h>


#include "threadpool.h"




extern int errno;






void mylog( FILE * fp, const  char  *format,  /**//*args*/ ...)




...{


    va_list ltVaList;


    va_start( ltVaList, format );


    vprintf( format, ltVaList );


    va_end( ltVaList );




    fflush( stdout );


}






void dispatch_threadpool_to_me(void *arg) ...{


  int seconds = (int) arg;




  fprintf(stdout, "  in dispatch_threadpool %d ", seconds);


  fprintf(stdout, "  thread#%ld ", pthread_self() );


  sleep(seconds);


  fprintf(stdout, "  done dispatch_threadpool %d ", seconds);


}






int main(int argc, char **argv) ...{


  threadpool tp;




  tp = create_threadpool(2);




  fprintf(stdout, "**main** dispatch_threadpool 3 ");


  dispatch_threadpool(tp, dispatch_threadpool_to_me, (void *) 3);


  fprintf(stdout, "**main** dispatch_threadpool 6 ");


  dispatch_threadpool(tp, dispatch_threadpool_to_me, (void *) 6);


  fprintf(stdout, "**main** dispatch_threadpool 7 ");


  dispatch_threadpool(tp, dispatch_threadpool_to_me, (void *) 7);




  fprintf(stdout, "**main** done first ");


  sleep(20);


  fprintf(stdout, " ");




  fprintf(stdout, "**main** dispatch_threadpool 3 ");


  dispatch_threadpool(tp, dispatch_threadpool_to_me, (void *) 3);


  fprintf(stdout, "**main** dispatch_threadpool 6 ");


  dispatch_threadpool(tp, dispatch_threadpool_to_me, (void *) 6);


  fprintf(stdout, "**main** dispatch_threadpool 7 ");


  dispatch_threadpool(tp, dispatch_threadpool_to_me, (void *) 7);




  fprintf(stdout, "**main done second ");




  destroy_threadpool( tp );




  sleep(20);


  exit(-1);


}



GCC 测试成功。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  linux c thread signal list null