您的位置:首页 > 其它

一个简单的线程池的实现

2016-08-03 16:45 447 查看
转自http://blog.csdn.net/yongshengfree/article/details/4536695#

线程池的原理大家都知道,直接上代码了^_^

Thread.h

[cpp] view
plain copy

#ifndef __THREAD_H  

#define __THREAD_H  

  

#include <vector>  

#include <string>  

#include <pthread.h>  

  

using namespace std;  

  

/** 

 * 执行任务的类,设置任务数据并执行 

 */  

class CTask  

{  

protected:  

    string m_strTaskName;  /** 任务的名称 */  

    void* m_ptrData;       /** 要执行的任务的具体数据 */  

public:  

    CTask(){}  

    CTask(string taskName)  

    {  

        m_strTaskName = taskName;  

        m_ptrData = NULL;  

    }  

    virtual int Run()= 0;  

    void SetData(void* data);    /** 设置任务数据 */  

  

public:  

    virtual ~CTask(){}  

};  

  

/** 

 * 线程池管理类的实现 

 */  

class CThreadPool  

{  

private:  

    static  vector<CTask*> m_vecTaskList;     /** 任务列表 */  

    static  bool shutdown;                    /** 线程退出标志 */           

    int     m_iThreadNum;                     /** 线程池中启动的线程数 */  

    pthread_t   *pthread_id;  

      

    static pthread_mutex_t m_pthreadMutex;    /** 线程同步锁 */  

    static pthread_cond_t m_pthreadCond;      /** 线程同步的条件变量 */  

  

protected:  

    static void* ThreadFunc(void * threadData); /** 新线程的线程回调函数 */  

    static int MoveToIdle(pthread_t tid);       /** 线程执行结束后,把自己放入到空闲线程中 */  

    static int MoveToBusy(pthread_t tid);       /** 移入到忙碌线程中去 */  

      

    int Create();          /** 创建线程池中的线程 */  

  

public:  

    CThreadPool(int threadNum = 10);  

    int AddTask(CTask *task);      /** 把任务添加到任务队列中 */  

    int StopAll();                 /** 使线程池中的线程退出 */  

    int getTaskSize();             /** 获取当前任务队列中的任务数 */  

};  

  

#endif  

 

实现文件

Thread.cpp

[cpp] view
plain copy

#include "Thread.h"  

#include <iostream>  

  

void CTask::SetData(void * data)  

{  

    m_ptrData = data;  

}  

  

vector<CTask*> CThreadPool::m_vecTaskList;         //任务列表  

bool CThreadPool::shutdown = false;  

      

pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;   

pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;  

  

/** 

 * 线程池管理类构造函数 

 */  

CThreadPool::CThreadPool(int threadNum)  

{  

    this->m_iThreadNum = threadNum;  

    cout << "I will create " << threadNum << " threads" << endl;  

    Create();  

}  

  

/** 

 * 线程回调函数 

 */  

void* CThreadPool::ThreadFunc(void* threadData)  

{  

    pthread_t tid = pthread_self();  

    while (1)  

    {  

        pthread_mutex_lock(&m_pthreadMutex);  

        while (m_vecTaskList.size() == 0 && !shutdown)  

        {  

            pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);  

        }  

          

        if (shutdown)  

        {  

            pthread_mutex_unlock(&m_pthreadMutex);  

            printf("thread %lu will exit/n", pthread_self());  

            pthread_exit(NULL);   

        }  

          

        printf("tid %lu run/n", tid);  

        vector<CTask*>::iterator iter = m_vecTaskList.begin();  

          

        /** 

        * 取出一个任务并处理之 

        */  

        CTask* task = *iter;  

        if (iter != m_vecTaskList.end())  

        {  

            task = *iter;  

            m_vecTaskList.erase(iter);  

        }  

          

        pthread_mutex_unlock(&m_pthreadMutex);  

          

        task->Run(); /** 执行任务 */  

        printf("tid:%lu idle/n", tid);  

    }  

    return (void*)0;  

}  

  

/** 

 * 往任务队列里边添加任务并发出线程同步信号 

 */  

int CThreadPool::AddTask(CTask *task)  

{  

    pthread_mutex_lock(&m_pthreadMutex);  

    this->m_vecTaskList.push_back(task);  

    pthread_mutex_unlock(&m_pthreadMutex);  

    pthread_cond_signal(&m_pthreadCond);  

    return 0;  

}  

  

/** 

 * 创建线程 

 */  

int CThreadPool::Create()  

{  

    pthread_id = (pthread_t*)malloc(sizeof(pthread_t) * m_iThreadNum);  

    for(int i = 0; i < m_iThreadNum; i++)  

    {  

        pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);  

    }  

    return 0;  

}  

  

/** 

 * 停止所有线程 

 */  

int CThreadPool::StopAll()  

{  

    /** 避免重复调用 */  

    if (shutdown)  

    {  

        return -1;    

    }  

    printf("Now I will end all threads!!/n");  

    /** 唤醒所有等待线程,线程池要销毁了 */  

    shutdown = true;  

    pthread_cond_broadcast(&m_pthreadCond);  

      

    /** 阻塞等待线程退出,否则就成僵尸了 */  

    for (int i = 0; i < m_iThreadNum; i++)  

    {  

        pthread_join(pthread_id[i], NULL);    

    }  

      

    free(pthread_id);  

    pthread_id = NULL;  

      

    /** 销毁条件变量和互斥体 */  

    pthread_mutex_destroy(&m_pthreadMutex);  

    pthread_cond_destroy(&m_pthreadCond);  

      

    return 0;  

}  

  

/** 

 * 获取当前队列中任务数 

 */  

int CThreadPool::getTaskSize()  

{  

    return m_vecTaskList.size();      

}  

 

main函数文件

[cpp] view
plain copy

#include "Thread.h"  

#include <iostream>  

  

class CMyTask: public CTask  

{  

public:  

    CMyTask(){}  

      

    inline int Run()  

    {  

        printf("%s/n", (char*)this->m_ptrData);  

        sleep(10);  

        return 0;  

    }  

};  

  

int main()  

{  

    CMyTask taskObj;  

      

    char szTmp[] = "this is the first thread running";  

    taskObj.SetData((void*)szTmp);  

    CThreadPool threadPool(10);  

      

    for(int i = 0; i < 20; i++)  

    {  

        threadPool.AddTask(&taskObj);  

    }  

      

    while(1)  

    {  

        printf("there are still %d tasks need to handle/n", threadPool.getTaskSize());  

        if (threadPool.getTaskSize() == 0)  

        {  

            if (threadPool.StopAll() == -1)  

            {     

                printf("Now I will exit from main/n");  

                exit(0);  

            }  

        }  

        sleep(2);  

    }  

      

    return 0;  

}  

CMakeLists.txt
1 cmake_minimum_required(VERSION 2.8)
2
3 project(Thread_pool)
4
5 find_package(Threads)#调用pthread库需要的语句1
6
7 aux_source_directory(. DIR_SRCS)
8
9 add_executable(main ${DIR_SRCS})
10
11 target_link_libraries(main ${CMAKE_THREAD_LIBS_INIT})#调用pthread库需要的语句2
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: