您的位置:首页 > 编程语言 > C语言/C++

win32线程池代码(WinApi/C++)(转)

2009-06-11 11:03 197 查看
看到不错的文章,自己留个备份。转载地址:http://blog.csdn.net/pjchen/archive/2004/11/06/170606.aspx

win32线程池代码(WinApi/C++)

健壮, 高效,易用,易于扩, 可用于任何C++编译器

//说明, 这段代码我用了很久, 我删除了自动调整规模的代码(因为他还不成熟)

/******************************************************************

* Thread Pool For Win32

* VC++ 6, BC++ 5.5(Free), GCC(Free)

* Update : 2004.6.9 llBird wushaojian@21cn.com

Use:

1):

void threadfunc(void *p)

{

//...

}

ThreadPool tp;

for(i=0; i<100; i++)

tp.Call(threadfunc);

ThreadPool tp(20);//20为初始线程池规模

tp.Call(threadfunc, lpPara);

tp.AdjustSize(50);//增加50

tp.AdjustSize(-30);//减少30

2):

class MyThreadJob : public ThreadJob //线程对象从ThreadJob扩展

{

public:

virtual void DoJob(void *p)//自定义的虚函数

{

//....

}

};

MyThreadJob mt[10];

ThreadPool tp;

for(i=0; i<100 i++)

tp.Call(mt + i);//tp.Call(mt + i, para);

*******************************************************************/

#ifndef _ThreadPool_H_

#define _ThreadPool_H_

#pragma warning(disable: 4530)

#pragma warning(disable: 4786)

#include <cassert>

#include <vector>

#include <queue>

#include <windows.h>

class ThreadJob //工作基类

{

public:

//供线程池调用的虚函数

virtual void DoJob(void *pPara) = 0;

};

class ThreadPool

{

public:

//dwNum 线程池规模

ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0)

{

InitializeCriticalSection(&_csThreadVector);

InitializeCriticalSection(&_csWorkQueue);

_EventComplete = CreateEvent(0, false, false, NULL);

_EventEnd = CreateEvent(0, true, false, NULL);

_SemaphoreCall = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);

_SemaphoreDel = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);

assert(_SemaphoreCall != INVALID_HANDLE_VALUE);

assert(_EventComplete != INVALID_HANDLE_VALUE);

assert(_EventEnd != INVALID_HANDLE_VALUE);

assert(_SemaphoreDel != INVALID_HANDLE_VALUE);

AdjustSize(dwNum <= 0 ? 4 : dwNum);

}

~ThreadPool()

{

DeleteCriticalSection(&_csWorkQueue);

CloseHandle(_EventEnd);

CloseHandle(_EventComplete);

CloseHandle(_SemaphoreCall);

CloseHandle(_SemaphoreDel);

vector<ThreadItem*>::iterator iter;

for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)

{

if(*iter)

delete *iter;

}

DeleteCriticalSection(&_csThreadVector);

}

//调整线程池规模

int AdjustSize(int iNum)

{

if(iNum > 0)

{

ThreadItem *pNew;

EnterCriticalSection(&_csThreadVector);

for(int _i=0; _i<iNum; _i++)

{

_ThreadVector.push_back(pNew = new ThreadItem(this));

assert(pNew);

pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);

assert(pNew->_Handle);

}

LeaveCriticalSection(&_csThreadVector);

}

else

{

iNum *= -1;

ReleaseSemaphore(_SemaphoreDel, iNum > _lThreadNum ? _lThreadNum : iNum, NULL);

}

return (int)_lThreadNum;

}

//调用线程池

void Call(void (*pFunc)(void *), void *pPara = NULL)

{

assert(pFunc);

EnterCriticalSection(&_csWorkQueue);

_JobQueue.push(new JobItem(pFunc, pPara));

LeaveCriticalSection(&_csWorkQueue);

ReleaseSemaphore(_SemaphoreCall, 1, NULL);

}

//调用线程池

inline void Call(ThreadJob * p, void *pPara = NULL)

{

Call(CallProc, new CallProcPara(p, pPara));

}

//结束线程池, 并同步等待

bool EndAndWait(DWORD dwWaitTime = INFINITE)

{

SetEvent(_EventEnd);

return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;

}

//结束线程池

inline void End()

{

SetEvent(_EventEnd);

}

inline DWORD Size()

{

return (DWORD)_lThreadNum;

}

inline DWORD GetRunningSize()

{

return (DWORD)_lRunningNum;

}

bool IsRunning()

{

return _lRunningNum > 0;

}

protected:

//工作线程

static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)

{

ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);

assert(pThread);

ThreadPool *pThreadPoolObj = pThread->_pThis;

assert(pThreadPoolObj);

InterlockedIncrement(&pThreadPoolObj->_lThreadNum);

HANDLE hWaitHandle[3];

hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;

hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;

hWaitHandle[2] = pThreadPoolObj->_EventEnd;

JobItem *pJob;

bool fHasJob;

for(;;)

{

DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);

//响应删除线程信号

if(wr == WAIT_OBJECT_0 + 1)

break;

//从队列里取得用户作业

EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);

if(fHasJob = !pThreadPoolObj->_JobQueue.empty())

{

pJob = pThreadPoolObj->_JobQueue.front();

pThreadPoolObj->_JobQueue.pop();

assert(pJob);

}

LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);

//受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)

if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)

break;

if(fHasJob && pJob)

{

InterlockedIncrement(&pThreadPoolObj->_lRunningNum);

pThread->_dwLastBeginTime = GetTickCount();

pThread->_dwCount++;

pThread->_fIsRunning = true;

pJob->_pFunc(pJob->_pPara); //运行用户作业

delete pJob;

pThread->_fIsRunning = false;

InterlockedDecrement(&pThreadPoolObj->_lRunningNum);

}

}

//删除自身结构

EnterCriticalSection(&pThreadPoolObj->_csThreadVector);

pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(),
pThreadPoolObj->_ThreadVector.end(), pThread));

LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);

delete pThread;

InterlockedDecrement(&pThreadPoolObj->_lThreadNum);

if(!pThreadPoolObj->_lThreadNum) //所有线程结束

SetEvent(pThreadPoolObj->_EventComplete);

return 0;

}

//调用用户对象虚函数

static void CallProc(void *pPara)

{

CallProcPara *cp = static_cast<CallProcPara *>(pPara);

assert(cp);

if(cp)

{

cp->_pObj->DoJob(cp->_pPara);

delete cp;

}

}

//用户对象结构

struct CallProcPara

{

ThreadJob* _pObj;//用户对象

void *_pPara;//用户参数

CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };

};

//用户函数结构

struct JobItem

{

void (*_pFunc)(void *);//函数

void *_pPara; //参数

JobItem(void (*pFunc)(void *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };

};

//线程池中的线程结构

struct ThreadItem

{

HANDLE _Handle; //线程句柄

ThreadPool *_pThis; //线程池的指针

DWORD _dwLastBeginTime; //最后一次运行开始时间

DWORD _dwCount; //运行次数

bool _fIsRunning;

ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };

~ThreadItem()

{

if(_Handle)

{

CloseHandle(_Handle);

_Handle = NULL;

}

}

};

std::queue<JobItem *> _JobQueue; //工作队列

std::vector<ThreadItem *> _ThreadVector; //线程数据

CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界

HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号

long _lThreadNum, _lRunningNum; //线程数, 运行的线程数

};

#endif //_ThreadPool_H_
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: