您的位置:首页 > 编程语言 > C语言/C++

线程池技术个人理解以及c语言的简单实现

2013-09-14 22:06 766 查看
这几天闲来无事,网上无意中看到了关于线程池的东西,发现挺有意思的,找了挺多资料,研究一下,线程池技术,个人理解,线程池是个集合(概念上的,当然是线程的集合),假设这个集合中有3个线程A , B, C 这三个线程初始化的时候就是等待的状态,等待任务的到来,假设有任务1, 2, 3, 4, 5(任务处理的内容是一样的),线程池会怎么处理呢

①:A会处理1任务(任务其实就是函数),B会处理2任务,C处理3任务

②:当1任务结束之后,A会处理4任务;2任务结束后B会处理5任务,3任务结束后,C会等待(因为没有任务了)③:当所有任务处理完之后,A,B,C都会等待状态,等待新任务的到来上面陈述的是一般简单线程池的具体事例,那么如何实现,下面贴出代码(linux环境)

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <assert.h>
#include <string.h>
typedef void* (*TaskFun)(void *arg);

typedef struct _TskNode{
    TaskFun TaskDmd;        /*task节点任务处理函数*/
    void *arg;              /*传入任务处理函数的参数*/
    struct _TskNode *pPre ; /*前一个任务节点*/
    struct _TskNode *pNext; /*后一个任务节点*/
} TskNode; /*队列节点*/

typedef struct _tskQueueManage{
    int tskCurWaitNum;         /*当前任务队列的任务数量*/
    struct _TskNode *pTskHead; /*当前任务队列的首节点*/
    struct _TskNode *pTskTail; /*当前任务队列的尾节点*/
} TskQueueManage;              /*任务队列描述符*/

typedef struct _threadManage{
    int thdMaxNum;              /*线程池容纳最大线程数量*/
    pthread_t *pth;             /*线程指针*/
    pthread_mutex_t mutex;      /*线程锁*/
    pthread_cond_t  cond;       /*线程条件变量*/
} ThreadManage;                 /*线程描述符*/

typedef struct _thredPoolManage{
    int shutFlag;                    /*线程池摧毁标识*/
    ThreadManage   *pThdManage;      /*线程描述符指针*/
    TskQueueManage *pTskQueueManage; /*任务队列描述符指针*/
} ThdPoolManage;                     /*线程池描述符*/

/*初始化,上述描述符*/
static int  mainDmdInit(int thdMaxNum);
/*线程的创建*/
static void thdPoolCreat();
/*线程池中的线程启动后处理*/
static void threadCreatdmd();
/*任务添加*/
static int  tskAddDmd(TaskFun TaskDmd, void* arg);
/*线程池的销毁*/
static void thdPoolDestroy();
/*线程池描述符指针*/
ThdPoolManage* pThdPoolManage = NULL;
static int  mainDmdInit(int thdMaxNum)
{
    int flag = 0;
	/*线程池描述符的创建*/
    pThdPoolManage = (ThdPoolManage *)malloc(sizeof(struct _thredPoolManage));
    if(pThdPoolManage != NULL){
        /*线程描述符的创建*/
        pThdPoolManage->pThdManage = (ThreadManage *)malloc(sizeof(struct _threadManage));
        if(pThdPoolManage->pThdManage != NULL){
            /*线程互斥锁于条件变量的初始化*/
            pthread_mutex_init(&(pThdPoolManage->pThdManage->mutex), NULL);
            pthread_cond_init(&(pThdPoolManage->pThdManage->cond), NULL);
            /*将线程池中允许的最大线程数赋值给线程池描述符成员*/
            pThdPoolManage->pThdManage->thdMaxNum = thdMaxNum;
            /*线程pthread_t的创建*/
            pThdPoolManage->pThdManage->pth = (pthread_t*)malloc(thdMaxNum*sizeof(pthread_t));
            if(pThdPoolManage->pThdManage->pth != NULL){
                /*工作队列描述符的创建*/
                pThdPoolManage->pTskQueueManage = (TskQueueManage *)malloc(sizeof(struct _tskQueueManage));
                if(pThdPoolManage->pTskQueueManage != NULL){
					/*初始队列工作描述符*/
                    pThdPoolManage->pTskQueueManage->tskCurWaitNum = 0;
                    pThdPoolManage->pTskQueueManage->pTskHead  = NULL;
                    pThdPoolManage->pTskQueueManage->pTskTail  = NULL;
					/*线程池中所有线程的创建*/
                    thdPoolCreat();
                } else {
					/*注意: 如果malloc不成功,一定要free掉之前的malloc申请*/
                    free(pThdPoolManage->pThdManage->pth);
                    free(pThdPoolManage->pThdManage);
                    free(pThdPoolManage);
					/*如果malloc失败,说明错误 flag 赋值为 = 1*/
                    flag = 1;
                    printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
                }
            } else {
                free(pThdPoolManage->pThdManage);
                free(pThdPoolManage);
                flag = 1;
                printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
            }
        } else {
            free(pThdPoolManage);
            flag = 1;
            printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
        }
    } else {
        flag = 1;
        printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
    }
    return flag;
}

static void thdPoolCreat()
{    
    ThreadManage *pThreadManage = NULL;
    int thdNum;
    pThreadManage = pThdPoolManage->pThdManage;
    for(thdNum = 0; thdNum < pThreadManage->thdMaxNum; thdNum++){
        pthread_create(&(pThreadManage->pth[thdNum]), NULL, (void*)threadCreatdmd, NULL);
    }
}

static void threadCreatdmd(void *arg)
{
    /*注意指针赋值之前要初始化为NULL,以免发生后续出现野指针的情况*/
    TskQueueManage* pTskQueueManage = NULL;
    ThreadManage *pThreadManage = NULL;
    TskNode* pCurTsk = NULL;

    printf("threadCreatdmd_creat_success:[ThreadId]=[%x]\n", pthread_self());
    pTskQueueManage = pThdPoolManage->pTskQueueManage;
    pThreadManage = pThdPoolManage->pThdManage;
    while(1){
        /*注意,因为会创建很多个threadCreatdmd函数,由于每个函数都要访问临界代码:即对工作队列的操作,所以必须要枷锁
		  以保证每一个处理函数(threadCreatdmd),在访问工作队列的时候,此工作队列不会被其他的处理函数修改*/
        pthread_mutex_lock(&(pThreadManage->mutex)); 
        /*最开始创建线程池中的线程需要等待的两种情况,即,while循环条件成立的情况, 1.线程池初始化时候(即没添加任务之前), 2.
          工作队列没有任务了(即任务都执行完了)*/
        while((pTskQueueManage->tskCurWaitNum == 0)&&(pThdPoolManage->shutFlag == 0)){
            printf("[ThreadId]=[%x]_waiting... ... ...\n", pthread_self());
			/*这时此线程会在这里阻塞*/
            pthread_cond_wait(&(pThreadManage->cond), &(pThreadManage->mutex));
        }
        if(pThdPoolManage->shutFlag == 1){
            pthread_mutex_unlock(&(pThreadManage->mutex));
            printf("[ThreadId]=[%x]_exit\n", pthread_self());
            pthread_exit(NULL);
        }
        printf("[ThreadId]=[%x]_starting_work!!\n", pthread_self());
        assert(pTskQueueManage->tskCurWaitNum != 0);
        assert(pTskQueueManage->pTskHead != NULL);
        (pTskQueueManage->tskCurWaitNum)--;
		/*取工作队列头部节点*/
        pCurTsk = pTskQueueManage->pTskHead;
		/*取头之后,将新头赋给下个元素*/
        pTskQueueManage->pTskHead = pTskQueueManage->pTskHead->pNext;
		/* 注意:如果最后一个元素 这时候 pTskQueueManage->pTskHead 是空,空的话是没有pPre的*/
		if(pTskQueueManage->pTskHead != NULL){
        	pTskQueueManage->pTskHead->pPre = NULL;
		}
        pthread_mutex_unlock(&(pThreadManage->mutex));
		/*执行头部任务节点的任务函数(即上面取出的节点)*/
        (pCurTsk->TaskDmd)(pCurTsk->arg);
        free(pCurTsk);
        pCurTsk = NULL;
    }
}

static int  tskAddDmd(TaskFun TaskDmd, void* arg)
{
    TskNode* pTskNode = NULL;
    TskQueueManage* pTskQueueManage = NULL;
    ThreadManage* pThdManage = NULL;
    int flag = 0;
    pTskQueueManage = pThdPoolManage->pTskQueueManage;
    pThdManage = pThdPoolManage->pThdManage;
    pthread_mutex_lock(&(pThdManage->mutex));
	/*任务添加,创建一个工作节点*/
    pTskNode = (TskNode*)malloc(sizeof(struct _TskNode));
    if(pTskNode != NULL){
    	/*将任务(函数赋值给节点)*/
        pTskNode->TaskDmd = TaskDmd;
        pTskNode->pNext   = NULL;
    	/*赋值参数*/
        pTskNode->arg     = arg;
        if(pTskQueueManage->tskCurWaitNum == 0){
            pTskQueueManage->pTskHead = pTskNode;
			pTskQueueManage->pTskTail = pTskNode;
        } else {
            pTskQueueManage->pTskTail->pNext = pTskNode;
            pTskNode->pPre = pTskQueueManage->pTskTail;
            pTskQueueManage->pTskTail = pTskNode;
        }
        (pTskQueueManage->tskCurWaitNum)++;
    } else {
        flag = 1;
        printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
    }
    pthread_mutex_unlock(&(pThdManage->mutex));
    pthread_cond_signal(&(pThdManage->cond));
    return flag;
}

static void  thdPoolDestroy()
{
    int thdNum;
    pThdPoolManage->shutFlag = 1;
    TskQueueManage *pTskQueueManage = NULL;
    ThreadManage   *pThdManage = NULL;
    TskNode*  pTskNode = NULL;
    pTskQueueManage = pThdPoolManage->pTskQueueManage;
    pThdManage   = pThdPoolManage->pThdManage;
    pthread_cond_broadcast(&(pThdPoolManage->pThdManage->cond));
    for(thdNum = 0; thdNum < pThdManage -> thdMaxNum; thdNum++){
        pthread_join(pThdManage->pth[thdNum], NULL);
    }
    while(pTskQueueManage->pTskHead != NULL){
        pTskNode = pTskQueueManage->pTskHead;
        pTskQueueManage->pTskHead = pTskQueueManage->pTskHead->pNext;
        free(pTskNode);
    }
    pthread_mutex_destroy(&(pThdManage->mutex));
    pthread_cond_destroy(&(pThdManage->cond));
    free(pThdPoolManage->pThdManage->pth);
    free(pThdPoolManage->pThdManage);
    free(pThdPoolManage->pTskQueueManage);
    free(pThdPoolManage);
    pThdPoolManage = NULL;
}

void TaskDmd(void *arg)
{
    printf("[ThreadId]=[%x] working on task[%d]\n", pthread_self(), *((int *)arg));
    sleep(1);
}
/*测试代码*/
int main()
{   
    int flag;
    int taskAdd;
    int *taskArg;
    int taskNum = 10;
    int thdMaxNum = 3;
    flag = mainDmdInit(thdMaxNum);
    /*保险起见两秒,因为可能会造成添加任务在线程等待之前执行*/
	sleep(2);
	taskArg = (int *)malloc(sizeof(int) * taskNum);
	memset(taskArg, 0x00, sizeof(int) * taskNum);
    if(flag != 1){
        for(taskAdd = 0; taskAdd < taskNum; taskAdd++){
			taskArg[taskAdd] = taskAdd;
            flag = tskAddDmd((void*)TaskDmd, &(taskArg[taskAdd]));
            if(flag == 1){
                printf("jobAdd error Num=[%d]\n", taskAdd);
            } else {
                printf("jobAdd success Num = [%d]\n", taskAdd);
            }
        }
    } else {
        printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
    }
	sleep(10);
    thdPoolDestroy();
    return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐