您的位置:首页 > 编程语言 > C语言/C++

转载一份C++线程池的代码,非常实用

2015-10-12 19:42 429 查看
win32线程池代码(WinApi/C++)

健壮, 高效,易用,易于扩, 可用于任何C++编译器 

//说明, 这段代码我用了很久, 我删除了自动调整规模的代码(因为他还不成熟)

/******************************************************************

*  Thread Pool For Win32 

*  VC++ 6, BC++ 5.5(Free), GCC(Free)

*  Update : 2004.6.9 llBird  wushaojian@21cn.com

Use:

1):

void threadfunc(void *p)

{

 //...

}

 ThreadPool tp;

 for(i=0; i<100; i++)

  tp.Call(threadfunc);

 ThreadPool tp(20);//20为初始线程池规模

 tp.Call(threadfunc, lpPara);

 tp.AdjustSize(50);//增加50

 tp.AdjustSize(-30);//减少30

2):

class MyThreadJob : public ThreadJob //线程对象从ThreadJob扩展

{

public:

 virtual void DoJob(void *p)//自定义的虚函数

 {

  //....

 }

};

 MyThreadJob mt[10];

 ThreadPool tp;

 for(i=0; i<100 i++)

  tp.Call(mt + i);//tp.Call(mt + i, para);

*******************************************************************/

#ifndef _ThreadPool_H_

#define _ThreadPool_H_

#pragma warning(disable: 4530)

#pragma warning(disable: 4786)

#include <cassert>

#include <vector>

#include <queue>

#include <windows.h>

using namespace std;

class ThreadJob  //工作基类

{

public:

  //供线程池调用的虚函数

  virtual void DoJob(void *pPara) = 0;

};

class ThreadPool

{

public:

  //dwNum 线程池规模

  ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 

  {

    InitializeCriticalSection(&_csThreadVector);

    InitializeCriticalSection(&_csWorkQueue);

    _EventComplete = CreateEvent(0, false, false, NULL);

    _EventEnd = CreateEvent(0, true, false, NULL);

    _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);

    _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);

    assert(_SemaphoreCall != INVALID_HANDLE_VALUE);

    assert(_EventComplete != INVALID_HANDLE_VALUE);

    assert(_EventEnd != INVALID_HANDLE_VALUE);

    assert(_SemaphoreDel != INVALID_HANDLE_VALUE);

    AdjustSize(dwNum <= 0 ? 4 : dwNum);

  }

  ~ThreadPool()

  {

    DeleteCriticalSection(&_csWorkQueue);

    CloseHandle(_EventEnd);

    CloseHandle(_EventComplete);

    CloseHandle(_SemaphoreCall);

    CloseHandle(_SemaphoreDel);

    vector<ThreadItem*>::iterator iter;

    for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)

    {

      if(*iter)

        delete *iter;

    }

    DeleteCriticalSection(&_csThreadVector);

  }

  //调整线程池规模

  int AdjustSize(int iNum)

  {

    if(iNum > 0)

    {

      ThreadItem *pNew;

      EnterCriticalSection(&_csThreadVector);

      for(int _i=0; _i<iNum; _i++)

      {

        _ThreadVector.push_back(pNew = new ThreadItem(this)); 

        assert(pNew);

        pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);

        // set priority

        SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);

        assert(pNew->_Handle);

      }

      LeaveCriticalSection(&_csThreadVector);

    }

    else

    {

      iNum *= -1;

      ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);

    }

    return (int)_lThreadNum;

  }

  //调用线程池

  void Call(void (*pFunc)(void  *), void *pPara = NULL)

  {

    assert(pFunc);

    EnterCriticalSection(&_csWorkQueue);

    _JobQueue.push(new JobItem(pFunc, pPara));

    LeaveCriticalSection(&_csWorkQueue);

    ReleaseSemaphore(_SemaphoreCall, 1, NULL);

  }

  //调用线程池

  inline void Call(ThreadJob * p, void *pPara = NULL)

  {

    Call(CallProc, new CallProcPara(p, pPara));

  }

  //结束线程池, 并同步等待

  bool EndAndWait(DWORD dwWaitTime = INFINITE)

  {

    SetEvent(_EventEnd);

    return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;

  }

  //结束线程池

  inline void End()

  {

    SetEvent(_EventEnd);

  }

  inline DWORD Size()

  {

    return (DWORD)_lThreadNum;

  }

  inline DWORD GetRunningSize()

  {

    return (DWORD)_lRunningNum;

  }

  bool IsRunning()

  {

    return _lRunningNum > 0;

  }

protected:

  //工作线程

  static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)

  {

    ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);

    assert(pThread);

    ThreadPool *pThreadPoolObj = pThread->_pThis;

    assert(pThreadPoolObj);

    InterlockedIncrement(&pThreadPoolObj->_lThreadNum);

    HANDLE hWaitHandle[3];

    hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;

    hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;

    hWaitHandle[2] = pThreadPoolObj->_EventEnd;

    JobItem *pJob;

    bool fHasJob;

    for(;;)

    {

      DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);

      //响应删除线程信号

      if(wr == WAIT_OBJECT_0 + 1)  

        break;

      //从队列里取得用户作业

      EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);

      if(fHasJob = !pThreadPoolObj->_JobQueue.empty())

      {

        pJob = pThreadPoolObj->_JobQueue.front();

        pThreadPoolObj->_JobQueue.pop();

        assert(pJob);

      }

      LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);

      //受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)

      if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)  

        break;

      if(fHasJob && pJob)

      {

        InterlockedIncrement(&pThreadPoolObj->_lRunningNum);

        pThread->_dwLastBeginTime = GetTickCount();

        pThread->_dwCount++;

        pThread->_fIsRunning = true;

        pJob->_pFunc(pJob->_pPara); //运行用户作业

        delete pJob; 

        pThread->_fIsRunning = false;

        InterlockedDecrement(&pThreadPoolObj->_lRunningNum);

      }

    }

    //删除自身结构

    EnterCriticalSection(&pThreadPoolObj->_csThreadVector);

    pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));

    LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);

    delete pThread;

    InterlockedDecrement(&pThreadPoolObj->_lThreadNum);

    if(!pThreadPoolObj->_lThreadNum)  //所有线程结束

      SetEvent(pThreadPoolObj->_EventComplete);

    return 0;

  }

  //调用用户对象虚函数

  static void CallProc(void *pPara) 

  {

    CallProcPara *cp = static_cast<CallProcPara *>(pPara);

    assert(cp);

    if(cp)

    {

      cp->_pObj->DoJob(cp->_pPara);

      delete cp;

    }

  }

  //用户对象结构

  struct CallProcPara  

  {

    ThreadJob* _pObj;//用户对象 

    void *_pPara;//用户参数

    CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };

  };

  //用户函数结构

  struct JobItem 

  {

    void (*_pFunc)(void  *);//函数

    void *_pPara; //参数

    JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };

  };

  //线程池中的线程结构

  struct ThreadItem

  {

    HANDLE _Handle; //线程句柄

    ThreadPool *_pThis;  //线程池的指针

    DWORD _dwLastBeginTime; //最后一次运行开始时间

    DWORD _dwCount; //运行次数

    bool _fIsRunning;

    ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };

    ~ThreadItem()

    {

      if(_Handle)

      {

        CloseHandle(_Handle);

        _Handle = NULL;

      }

    }

  };

  std::queue<JobItem *> _JobQueue;  //工作队列

  std::vector<ThreadItem *>  _ThreadVector; //线程数据

  CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界

  HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号

  long _lThreadNum, _lRunningNum; //线程数, 运行的线程数

};

#endif //_ThreadPool_H_

转载自 http://blog.csdn.net/pjchen/archive/2004/11/06/170606.aspx

基本上是拿来就用了,对WIN32 API不熟,但对线程池的逻辑还是比较熟的,认为这个线程池写得很清晰,我拿来用在一个多线程下载的模块中。很实用的东东。

调用方法

void threadfunc(void *p)

{

     YourClass* yourObject = (YourClass*)    p;

 //...

}

 ThreadPool tp;

 for(i=0; i<100; i++)

  tp.Call(threadfunc);

ThreadPool tp(20);//20为初始线程池规模

 tp.Call(threadfunc, lpPara);

     

使用时注意几点:

1. ThreadJob  没什么用,直接写线程函数吧。 

2. 线程函数(threadfunc)的入口参数void* 可以转成自定义的类型对象,这个对象可以记录下线程运行中的数据,并设置线程当前状态,以此与线程进行交互。

3. 线程池有一个EndAndWait函数,用于让线程池中所有计算正常结束。有时线程池中的一个线程可能要运行很长时间,怎么办?可以通过线程函数threadfunc的入口参数对象来处理,比如:

class YourClass {

  int cmd; // cmd = 1是上线程停止计算,正常退出。

};

threadfunc(void* p) {

  YourClass* yourObject = (YourClass*)p;

  while (true) {

    // do some calculation

    if (yourClass->cmd == 1)

      break;

  }

}

在主线程中设置yourClass->cmd = 1,该线程就会自然结束。

很简洁通用的线程池实现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  C++ 线程池