您的位置:首页 > 编程语言 > C语言/C++

一个Windows C++的线程池类实现

2017-10-26 17:49 330 查看
1.首先是线程类:

Thread.h

[cpp] view
plain copy

#ifndef __THREAD_H__

#define __THREAD_H__

#include <string>

#include <windows.h>

#include <process.h>

class Runnable

{

public:

virtual ~Runnable() {};

virtual void Run() = 0;

};

class CThread : public Runnable

{

private:

explicit CThread(const CThread & rhs);

public:

CThread();

CThread(Runnable * pRunnable);

CThread(const char * ThreadName, Runnable * pRunnable = NULL);

CThread(std::string ThreadName, Runnable * pRunnable = NULL);

~CThread(void);

/**

开始运行线程

@arg bSuspend 开始运行时是否挂起

**/

bool Start(bool bSuspend = false);

/**

运行的线程函数,可以使用派生类重写此函数

**/

virtual void Run();

/**

当前执行此函数线程等待线程结束

@arg timeout 等待超时时间,如果为负数,等待无限时长

**/

void Join(int timeout = -1);

/**

恢复挂起的线程

**/

void Resume();

/**

挂起线程

**/

void Suspend();

/**

终止线程的执行

**/

bool Terminate(unsigned long ExitCode);

unsigned int GetThreadID();

std::string GetThreadName();

void SetThreadName(std::string ThreadName);

void SetThreadName(const char * ThreadName);

private:

static unsigned int WINAPI StaticThreadFunc(void * arg);

private:

HANDLE m_handle;

Runnable * const m_pRunnable;

unsigned int m_ThreadID;

std::string m_ThreadName;

volatile bool m_bRun;

};

#endif

Thread.cpp

[cpp] view
plain copy

#include "Thread.h"

CThread::CThread(void) :

m_pRunnable(NULL),

m_bRun(false)

{

}

CThread::~CThread(void)

{

}

CThread::CThread(Runnable * pRunnable) :

m_ThreadName(""),

m_pRunnable(pRunnable),

m_bRun(false)

{

}

CThread::CThread(const char * ThreadName, Runnable * pRunnable) :

m_ThreadName(ThreadName),

m_pRunnable(pRunnable),

m_bRun(false)

{

}

CThread::CThread(std::string ThreadName, Runnable * pRunnable) :

m_ThreadName(ThreadName),

m_pRunnable(pRunnable),

m_bRun(false)

{

}

bool CThread::Start(bool bSuspend)

{

if(m_bRun)

{

return true;

}

if(bSuspend)

{

m_handle = (HANDLE)_beginthreadex(NULL, 0, StaticThreadFunc, this, CREATE_SUSPENDED, &m_ThreadID);

}

else

{

m_handle = (HANDLE)_beginthreadex(NULL, 0, StaticThreadFunc, this, 0, &m_ThreadID);

}

m_bRun = (NULL != m_handle);

return m_bRun;

}

void CThread::Run()

{

if(!m_bRun)

{

return;

}

if(NULL != m_pRunnable)

{

m_pRunnable->Run();

}

m_bRun = false;

}

void CThread::Join(int timeout)

{

if(NULL == m_handle || !m_bRun)

{

return;

}

if(timeout <= 0)

{

timeout = INFINITE;

}

::WaitForSingleObject(m_handle, timeout);

}

void CThread::Resume()

{

if(NULL == m_handle || !m_bRun)

{

return;

}

::ResumeThread(m_handle);

}

void CThread::Suspend()

{

if(NULL == m_handle || !m_bRun)

{

return;

}

::SuspendThread(m_handle);

}

bool CThread::Terminate(unsigned long ExitCode)

{

if(NULL == m_handle || !m_bRun)

{

return true;

}

if(::TerminateThread(m_handle, ExitCode))

{

::CloseHandle(m_handle);

return true;

}

return false;

}

unsigned int CThread::GetThreadID()

{

return m_ThreadID;

}

std::string CThread::GetThreadName()

{

return m_ThreadName;

}

void CThread::SetThreadName(std::string ThreadName)

{

m_ThreadName = ThreadName;

}

void CThread::SetThreadName(const char * ThreadName)

{

if(NULL == ThreadName)

{

m_ThreadName = "";

}

else

{

m_ThreadName = ThreadName;

}

}

unsigned int CThread::StaticThreadFunc(void * arg)

{

CThread * pThread = (CThread *)arg;

pThread->Run();

return 0;

}

2. 线程池的使用:

ThreadPoolExecutor.h

[cpp] view
plain copy

#ifndef __THREAD_POOL_EXECUTOR__

#define __THREAD_POOL_EXECUTOR__

#include "Thread.h"

#include <set>

#include <list>

#include <windows.h>

class CThreadPoolExecutor

{

public:

CThreadPoolExecutor(void);

~CThreadPoolExecutor(void);

/**

初始化线程池,创建minThreads个线程

**/

bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse);

/**

执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true

若当前任务列表满了,但当前线程数量小于最大线程数,将创建新线程执行此任务,返回true

若当前任务列表满了,但当前线程数量等于最大线程数,将丢弃此任务,返回false

**/

bool Execute(Runnable * pRunnable);

/**

终止线程池,先制止塞入任务,

然后等待直到任务列表为空,

然后设置最小线程数量为0,

等待直到线程数量为空,

清空垃圾堆中的任务

**/

void Terminate();

/**

返回线程池中当前的线程数量

**/

unsigned int GetThreadPoolSize();

private:

/**

获取任务列表中的任务,若任务列表为空,返回NULL

**/

Runnable * GetTask();

static unsigned int WINAPI StaticThreadFunc(void * arg);

private:

class CWorker : public CThread

{

public:

CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL);

~CWorker();

void Run();

private:

CThreadPoolExecutor * m_pThreadPool;

Runnable * m_pFirstTask;

volatile bool m_bRun;

};

typedef std::set<CWorker *> ThreadPool;

typedef std::list<Runnable *> Tasks;

typedef Tasks::iterator TasksItr;

typedef ThreadPool::iterator ThreadPoolItr;

ThreadPool m_ThreadPool;

ThreadPool m_TrashThread;

Tasks m_Tasks;

CRITICAL_SECTION m_csTasksLock;

CRITICAL_SECTION m_csThreadPoolLock;

volatile bool m_bRun;

volatile bool m_bEnableInsertTask;

volatile unsigned int m_minThreads;

volatile unsigned int m_maxThreads;

volatile unsigned int m_maxPendingTasks;

};

#endif

ThreadPoolExecutor.cpp

[cpp] view
plain copy

#include "ThreadPoolExecutor.h"

CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) :

m_pThreadPool(pThreadPool),

m_pFirstTask(pFirstTask),

m_bRun(true)

{

}

CThreadPoolExecutor::CWorker::~CWorker()

{

}

/**

执行任务的工作线程。

当前没有任务时,

如果当前线程数量大于最小线程数量,减少线程,

否则,执行清理程序,将线程类给释放掉

**/

void CThreadPoolExecutor::CWorker::Run()

{

Runnable * pTask = NULL;

while(m_bRun)

{

if(NULL == m_pFirstTask)

{

pTask = m_pThreadPool->GetTask();

}

else

{

pTask = m_pFirstTask;

m_pFirstTask = NULL;

}

if(NULL == pTask)

{

EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));

if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)

{

ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);

if(itr != m_pThreadPool->m_ThreadPool.end())

{

m_pThreadPool->m_ThreadPool.erase(itr);

m_pThreadPool->m_TrashThread.insert(this);

}

m_bRun = false;

}

else

{

ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();

while(itr != m_pThreadPool->m_TrashThread.end())

{

(*itr)->Join();

delete (*itr);

m_pThreadPool->m_TrashThread.erase(itr);

itr = m_pThreadPool->m_TrashThread.begin();

}

}

LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));

continue;

}

else

{

pTask->Run();

pTask = NULL;

}

}

}

/////////////////////////////////////////////////////////////////////////////////////////////

CThreadPoolExecutor::CThreadPoolExecutor(void) :

m_bRun(false),

m_bEnableInsertTask(false)

{

InitializeCriticalSection(&m_csTasksLock);

InitializeCriticalSection(&m_csThreadPoolLock);

}

CThreadPoolExecutor::~CThreadPoolExecutor(void)

{

Terminate();

DeleteCriticalSection(&m_csTasksLock);

DeleteCriticalSection(&m_csThreadPoolLock);

}

bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)

{

if(minThreads == 0)

{

return false;

}

if(maxThreads < minThreads)

{

return false;

}

m_minThreads = minThreads;

m_maxThreads = maxThreads;

m_maxPendingTasks = maxPendingTasks;

unsigned int i = m_ThreadPool.size();

for(; i<minThreads; i++)

{

//创建线程

CWorker * pWorker = new CWorker(this);

if(NULL == pWorker)

{

return false;

}

EnterCriticalSection(&m_csThreadPoolLock);

m_ThreadPool.insert(pWorker);

LeaveCriticalSection(&m_csThreadPoolLock);

pWorker->Start();

}

m_bRun = true;

m_bEnableInsertTask = true;

return true;

}

bool CThreadPoolExecutor::Execute(Runnable * pRunnable)

{

if(!m_bEnableInsertTask)

{

return false;

}

if(NULL == pRunnable)

{

return false;

}

if(m_Tasks.size() >= m_maxPendingTasks)

{

if(m_ThreadPool.size() < m_maxThreads)

{

CWorker * pWorker = new CWorker(this, pRunnable);

if(NULL == pWorker)

{

return false;

}

EnterCriticalSection(&m_csThreadPoolLock);

m_ThreadPool.insert(pWorker);

LeaveCriticalSection(&m_csThreadPoolLock);

pWorker->Start();

}

else

{

return false;

}

}

else

{

EnterCriticalSection(&m_csTasksLock);

m_Tasks.push_back(pRunnable);

LeaveCriticalSection(&m_csTasksLock);

}

return true;

}

Runnable * CThreadPoolExecutor::GetTask()

{

Runnable * Task = NULL;

EnterCriticalSection(&m_csTasksLock);

if(!m_Tasks.empty())

{

Task = m_Tasks.front();

m_Tasks.pop_front();

}

LeaveCriticalSection(&m_csTasksLock);

return Task;

}

unsigned int CThreadPoolExecutor::GetThreadPoolSize()

{

return m_ThreadPool.size();

}

void CThreadPoolExecutor::Terminate()

{

m_bEnableInsertTask = false;

while(m_Tasks.size() > 0)

{

Sleep(1);

}

m_bRun = false;

m_minThreads = 0;

m_maxThreads = 0;

m_maxPendingTasks = 0;

while(m_ThreadPool.size() > 0)

{

Sleep(1);

}

EnterCriticalSection(&m_csThreadPoolLock);

ThreadPoolItr itr = m_TrashThread.begin();

while(itr != m_TrashThread.end())

{

(*itr)->Join();

delete (*itr);

m_TrashThread.erase(itr);

itr = m_TrashThread.begin();

}

LeaveCriticalSection(&m_csThreadPoolLock);

}

用法:

#include "Thread.h"
#include "ThreadPoolExecutor.h"

class R : public Runnable
{
public:
~R()
{
}
void Run()
{
printf("Hello World/n");
}
};

int _tmain(int argc, _TCHAR* argv[])
{
CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();
pExecutor->Init(1, 10, 50);
R r;
for(int i=0;i<100;i++)
{
while(!pExecutor->Execute(&r))
{
}
}
pExecutor->Terminate();
delete pExecutor;
getchar();
return 0;
}


本文转载自:http://blog.csdn.net/huyiyang2010/article/details/5809919
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: