您的位置:首页 > 编程语言 > C语言/C++

一个Windows C++的线程池的实现

2016-06-02 10:47 375 查看
此线程池所依赖的线程类,请参看《一个Windows C++的线程类实现》:
http://blog.csdn.net/huyiyang2010/archive/2010/08/10/5801597.aspx
 

ThreadPoolExecutor.h

#ifndef __THREAD_POOL_EXECUTOR__  

#define __THREAD_POOL_EXECUTOR__  

  

#include "Thread.h"  

#include <set>  

#include <list>  

#include <windows.h>  

  

class CThreadPoolExecutor  

{  

public:  

    CThreadPoolExecutor(void);  

    ~CThreadPoolExecutor(void);  

  

    /** 

      初始化线程池,创建minThreads个线程 

    **/  

    bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse);  

  

    /** 

      执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true 

      若当前任务列表满了,但当前线程数量小于最大线程数,将创建新线程执行此任务,返回true 

      若当前任务列表满了,但当前线程数量等于最大线程数,将丢弃此任务,返回false 

    **/  

    bool Execute(Runnable * pRunnable);  

  

    /** 

      终止线程池,先制止塞入任务, 

      然后等待直到任务列表为空, 

      然后设置最小线程数量为0, 

      等待直到线程数量为空, 

      清空垃圾堆中的任务 

    **/  

    void Terminate();  

  

    /** 

      返回线程池中当前的线程数量 

    **/  

    unsigned int GetThreadPoolSize();  

  

private:  

    /** 

      获取任务列表中的任务,若任务列表为空,返回NULL 

    **/  

    Runnable * GetTask();  

  

    static unsigned int WINAPI StaticThreadFunc(void * arg);  

  

private:  

    class CWorker : public CThread  

    {  

    public:  

        CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL);  

        ~CWorker();  

        void Run();  

  

    private:  

        CThreadPoolExecutor * m_pThreadPool;  

        Runnable * m_pFirstTask;  

        volatile bool m_bRun;  

    };  

  

    typedef std::set<CWorker *> ThreadPool;  

    typedef std::list<Runnable *> Tasks;  

    typedef Tasks::iterator TasksItr;  

    typedef ThreadPool::iterator ThreadPoolItr;  

  

    ThreadPool m_ThreadPool;  

    ThreadPool m_TrashThread;  

    Tasks m_Tasks;  

  

    CRITICAL_SECTION m_csTasksLock;  

    CRITICAL_SECTION m_csThreadPoolLock;  

  

    volatile bool m_bRun;  

    volatile bool m_bEnableInsertTask;  

    volatile unsigned int m_minThreads;  

    volatile unsigned int m_maxThreads;  

    volatile unsigned int m_maxPendingTasks;  

};  

  

#endif  

 

 

ThreadPoolExecutor.cpp

#include "ThreadPoolExecutor.h"  

  

CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) :   

m_pThreadPool(pThreadPool),  

m_pFirstTask(pFirstTask),  

m_bRun(true)  

{  

      

}  

  

CThreadPoolExecutor::CWorker::~CWorker()  

{  

}  

  

/** 

  执行任务的工作线程。 

  当前没有任务时, 

  如果当前线程数量大于最小线程数量,减少线程, 

  否则,执行清理程序,将线程类给释放掉 

**/  

void CThreadPoolExecutor::CWorker::Run()  

{  

    Runnable * pTask = NULL;  

    while(m_bRun)  

    {  

        if(NULL == m_pFirstTask)  

        {  

            pTask = m_pThreadPool->GetTask();  

        }  

        else  

        {  

            pTask = m_pFirstTask;  

            m_pFirstTask = NULL;  

        }  

  

        if(NULL == pTask)  

        {  

            EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));  

            if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)  

            {  

                ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);  

                if(itr != m_pThreadPool->m_ThreadPool.end())  

                {  

                    m_pThreadPool->m_ThreadPool.erase(itr);  

                    m_pThreadPool->m_TrashThread.insert(this);  

                }  

                m_bRun = false;  

            }  

            else  

            {  

                ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();  

                while(itr != m_pThreadPool->m_TrashThread.end())  

                {  

                    (*itr)->Join();  

                    delete (*itr);  

                    m_pThreadPool->m_TrashThread.erase(itr);  

                    itr = m_pThreadPool->m_TrashThread.begin();  

                }  

            }  

            LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));  

            continue;  

        }  

        else  

        {  

            pTask->Run();  

            pTask = NULL;  

        }  

    }  

}  

  

/////////////////////////////////////////////////////////////////////////////////////////////  

  

CThreadPoolExecutor::CThreadPoolExecutor(void) :   

m_bRun(false),  

m_bEnableInsertTask(false)  

{  

    InitializeCriticalSection(&m_csTasksLock);  

    InitializeCriticalSection(&m_csThreadPoolLock);  

}  

  

CThreadPoolExecutor::~CThreadPoolExecutor(void)  

{  

    Terminate();  

    DeleteCriticalSection(&m_csTasksLock);  

    DeleteCriticalSection(&m_csThreadPoolLock);  

}  

  

bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)  

{  

    if(minThreads == 0)  

    {  

        return false;  

    }  

    if(maxThreads < minThreads)  

    {  

        return false;  

    }  

    m_minThreads = minThreads;  

    m_maxThreads = maxThreads;  

    m_maxPendingTasks = maxPendingTasks;  

    unsigned int i = m_ThreadPool.size();  

    for(; i<minThreads; i++)  

    {  

        //创建线程  

        CWorker * pWorker = new CWorker(this);  

        if(NULL == pWorker)  

        {  

            return false;  

        }  

        EnterCriticalSection(&m_csThreadPoolLock);  

        m_ThreadPool.insert(pWorker);  

        LeaveCriticalSection(&m_csThreadPoolLock);  

        pWorker->Start();  

    }  

    m_bRun = true;  

    m_bEnableInsertTask = true;  

    return true;  

}  

  

bool CThreadPoolExecutor::Execute(Runnable * pRunnable)  

{  

    if(!m_bEnableInsertTask)  

    {  

        return false;  

    }  

    if(NULL == pRunnable)  

    {  

        return false;  

    }  

    if(m_Tasks.size() >= m_maxPendingTasks)  

    {  

        if(m_ThreadPool.size() < m_maxThreads)  

        {  

            CWorker * pWorker = new CWorker(this, pRunnable);  

            if(NULL == pWorker)  

            {  

                return false;  

            }  

            EnterCriticalSection(&m_csThreadPoolLock);  

            m_ThreadPool.insert(pWorker);  

            LeaveCriticalSection(&m_csThreadPoolLock);  

            pWorker->Start();  

        }  

        else  

        {  

            return false;  

        }  

    }  

    else  

    {  

        EnterCriticalSection(&m_csTasksLock);  

        m_Tasks.push_back(pRunnable);  

        LeaveCriticalSection(&m_csTasksLock);  

    }  

    return true;  

}  

  

Runnable * CThreadPoolExecutor::GetTask()  

{  

    Runnable * Task = NULL;  

    EnterCriticalSection(&m_csTasksLock);  

    if(!m_Tasks.empty())  

    {  

        Task = m_Tasks.front();  

        m_Tasks.pop_front();  

    }  

    LeaveCriticalSection(&m_csTasksLock);  

    return Task;  

}  

  

unsigned int CThreadPoolExecutor::GetThreadPoolSize()  

{  

    return m_ThreadPool.size();  

}  

  

void CThreadPoolExecutor::Terminate()  

{  

    m_bEnableInsertTask = false;  

    while(m_Tasks.size() > 0)  

    {  

        Sleep(1);  

    }  

    m_bRun = false;  

    m_minThreads = 0;  

    m_maxThreads = 0;  

    m_maxPendingTasks = 0;  

    while(m_ThreadPool.size() > 0)  

    {  

        Sleep(1);  

    }  

    EnterCriticalSection(&m_csThreadPoolLock);  

    ThreadPoolItr itr = m_TrashThread.begin();  

    while(itr != m_TrashThread.end())  

    {  

        (*itr)->Join();  

        delete (*itr);  

        m_TrashThread.erase(itr);  

        itr = m_TrashThread.begin();  

    }  

    LeaveCriticalSection(&m_csThreadPoolLock);  

}  

 

 

用法:

#include "Thread.h"

#include "ThreadPoolExecutor.h"

class R : public Runnable

{

public:

    ~R()

    {

    }

    void Run()

    {

        printf("Hello World/n");

    }

};

int _tmain(int argc, _TCHAR* argv[])

{

    CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();

    pExecutor->Init(1, 10, 50);

    R r;

    for(int i=0;i<100;i++)

    {

        while(!pExecutor->Execute(&r))

        {

        }

    }

    pExecutor->Terminate();

    delete pExecutor;

    getchar();

    return 0;

}

 

测试结果:

机器:

Intel(R) Core(TM)2 Duo CPU

E8400 @ 3.00GHz

2G内存

 

对于100个任务并且每个任务包含10000000个循环,任务中无等待:

单线程执行耗时:2281时间片

单线程池执行耗时:2219时间片

2个线程的线程池耗时:1156时间片

5个线程的线程池耗时:1166时间片

10个线程的线程池耗时:1157时间片

100个线程的线程池耗时:1177时间片

 

 

from:
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: