您的位置:首页 > 编程语言 > C语言/C++

程序随笔——C++实现的一个线程池

2017-06-10 17:02 507 查看
1.线程池简介

我们知道在线程池是一种多线程处理形式,处理过程中我们将相应的任务提交给线程池,线程池会分配对应的工作线程执行任务或存放在任务队列中,等待执行。

面向对象编程中,创建和销毁对象是需要消耗一定时间的,因为创建一个对象要获取内存资源或者其它更多资源。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因。当然线程池也同样适用这种思想。

因为线程的创建和销毁时需要消耗一定的时间的。假设,线程的创建消耗T1,线程执行任务的时间T2,线程的销毁销毁T3。当T1 + T3 > T2时候,使用线程池技术,通过线程的复用,就能提高程序的性能。

2.线程池的作用

需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。
3.线程池的实现
我们这里实现的线程池是通过类CThreadPool来实现的。这个线程池类的构造函数如下所示:

CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueue<Task> *pQueue);

构造函数参数的含义:

corePoolSize:核心池的大小,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,默认情况下,在创建了线程池之后,线程池中的线程数为0,,当有任务到来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把达到的任务放到缓存对队列当中。
maximumPoolSize:线程池最大线程数,表示在线程池中最多能创建多少个线程。

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。

CBlockingQueue<Task>:任务队列,用来存放提交到线程池的任务。

我们实现的线程池的大概原理是:在创建线程池之后,线程池中并没有任何工作线程的,当使用线程池提供的execute方法,向线程池提交任务的时候,如果线程池中存在空闲的工作线程,那么就会复用该工作线程,并不会去创建新的工作线程;但是如果没有工作线程或空闲的工作线程,并且当然的工作线程数量小于核心池的大小时候,会创建一个工作线程去执行任务,若当前的工作线程数量达到了核心池的数量,那么就会将任务放入到队列中去;若队列满的情况下,如果没有达到线程池的最大线程数,那么就会将创建新的工作线程去执行任务;若线程数达到了最大的线程数,那么我们是抛出异常,这里没有提供一个拒绝的策略,后续有时间的会处理,目前就向采用抛出异常;并且在当前的工作线程数大于核心池数的时候,会有超时机制,关闭指定某时间的空闲工作线程,直到等于核心池的大小。当然,目前的实现还是有缺陷的,线程是结束了,当时并没释放到资源,目前没想到好的方法。

为了简化线程池的配置,我们提供了一个工厂类来进行线程池的创建。我们的工厂类支持创建三种线程池:

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

整个实现的原理就是这样,下面 直接上实现,我们的实现只适用Linux上,并且需要依赖boost库。一些实现是之前写的库。这里只贴出新实现的代码。

1.FactoryThreadPool.h

#ifndef __FACTORY_THREAD_POOL_H__
#define __FACTORY_THREAD_POOL_H__

#include "ThreadPool.h"
#include "LinkedBlockingQueue.h"

//创建线程池的工厂类,一般直接使用工厂类创建所需的线程池
//共提供三种线程池:固定大小线程池、可缓存的线程池、单个后台线程
class CFactoryThreadPool
{
public:
CFactoryThreadPool() { }
~CFactoryThreadPool() { }

//固定大小线程池。每次提交一个任务就创建一个线程池,直到线程池达到
//线程池的最大大小。线程池的大小一旦达到最大值就会保持不变。
CAbstractThreadPool *newFixedThreadPool(int iThreads)
{
CBlockingQueue<Task> *pBlockingQueue = new CLinkedBlockingQueue<Task>();

return new CThreadPool(iThreads, iThreads, 0, pBlockingQueue);
}
//可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么
//就会回收部分空闲(60s不执行任务)的线程,当任务数增加是,此线程池
//又可以智能的添加新线程来处理任务,线程池的大小依赖于操作系统能创建
//的最大线程的大小
CAbstractThreadPool *newCachedThreadPool()
{
CBlockingQueue<Task> *pBlockingQueue = new CLinkedBlockingQueue<Task>(1);

return new CThreadPool(0, INT_MAX, 60, pBlockingQueue);
}
//单个后台线程。这个线程池只有一个线程在工作,也就是相当于单线程串行
//执行所有任务。保证所有任务的执行按照提交的顺序。
CAbstractThreadPool * newSingleThreadExecutor()
{
CBlockingQueue<Task> *pBlockingQueue = new CLinkedBlockingQueue<Task>();

return new CThreadPool(1, 1, 0, pBlockingQueue);
}
};

#endif	//#ifndef __FACTORY_THREAD_POOL_H__
2.AbstractThreadPool.h

#ifndef __ABSTRACT_THREAD_POOL_H__
#define __ABSTRACT_THREAD_POOL_H__

#include "TaskFuncExecute.h"

//抽象基类
class CAbstractThreadPool
{
public:
CAbstractThreadPool() { }

virtual ~CAbstractThreadPool() { }

virtual void execute(const Task &task) = 0;
virtual void shutdown() = 0;
};

#endif	//#ifndef __ABSTRACT_THREAD_POOL_H__
3.TaskFuncExecute.h

#ifndef __TASK_FUNC_EXECUTE_H__
#define __TASK_FUNC_EXECUTE_H__

#include <boost/function.hpp>

typedef boost::function<void(void)>	Task;

//类CTaskFuncExecute,用来执行boost::function函数对象,里面实现了execute
class CTaskFuncExecute
{
public:
CTaskFuncExecute(const Task &task = nullTask)
:m_TaskFunction(task)
{

}

~CTaskFuncExecute()
{

}

CTaskFuncExecute(const CTaskFuncExecute &Excute)
{
m_TaskFunction = Excute.m_TaskFunction;
}

CTaskFuncExecute &operator=(const CTaskFuncExecute &Excute)
{
m_TaskFunction = Excute.m_TaskFunction;
return *this;
}

void execute()
{
m_TaskFunction();
}

//定义一个空任务,实际上不做什么
static void nullTask(void)
{
//nothing
}

private:
Task m_TaskFunction;
};

#endif	//#ifndef __TASK_FUNC_EXECUTE_H__
4.Thread.h

#ifndef __THREAD_H__
#define __THREAD_H__

#include <pthread.h>
#include <string>
#include <boost/function.hpp>
#include "TaskFuncExecute.h"
#include "Condition.h"

//CThread实现了对Linux中的一些线程操作的封装
class CThread
{
friend void *startThread(void *pObj);

public:
typedef boost::function<void(void)> ThreadFunc;
enum ThreadStatus {RUNNING, EXIT, JOIN};
CThread(const ThreadFunc &Func, const std::string &strName);
virtual ~CThread();

void start(const Task &InitTask);
int join();
void exit(void *rval_ptr);

void setname(std::string &strName);
const std::string &name(void);

pthread_t getSelfId();

ThreadStatus getThreadStatus();

private:
ThreadStatus m_Status;			//线程的状态
pthread_t m_tId;				//线程标识
ThreadFunc m_Func;
CMutexLock m_Mutex;
std::string m_strThreadName;	//线程名
};

#endif	//#ifndef __THREAD_H__
5.thread.cpp

#include "Thread.h"
#include "Logger.h"
#include "Exception.h"

using namespace Log;
using namespace Exception;

//一个辅助类,实现运行boost::function函数对象
struct threadData
{
typedef CThread::ThreadFunc ThreadFunc;
ThreadFunc m_Func;
CThread *m_pThis;
Task m_Task;

threadData(const ThreadFunc &Func, CThread *pThis, const Task &InitTask)
:m_Func(Func)
,m_pThis(pThis)
,m_Task(InitTask)
{

}

void runThreadFunc()
{
try
{
m_Func();
}
catch (std::exception &ex)
{
LOG_FATAL << "runThreadFunc exception : " << ex.what();
}
catch (...)
{
LOG_FATAL << "runThreadFunc unknow exception";
}
}
};

void *startThread(void *pObj)
{
try
{
if (pObj == NULL)
throw CException("startThread parament obj is null");

threadData *pData = static_cast<threadData *>(pObj);
pData->m_Task();
pData->runThreadFunc();
pData->m_pThis->m_Mutex.lock();
pData->m_pThis->m_Status = CThread::EXIT;
delete pData;
pData->m_pThis->m_Mutex.unlock();
}
catch (const CException &ex)
{
LOG_FATAL << "throw exception : " << ex.what();
}
return NULL;
}

CThread::CThread(const ThreadFunc &Func, const std::string &strName)
:m_Status(RUNNING)
,m_tId(0)
,m_Func(Func)
,m_Mutex()
,m_strThreadName(strName)
{

}

CThread::~CThread()
{
if (m_Status != JOIN)
{
::pthread_detach(m_tId);
}
}

void CThread::start(const Task &InitTask)
{
threadData *pData = new threadData(m_Func, this, InitTask);
int iRet = ::pthread_create(&m_tId, NULL, startThread, pData);
if (iRet != 0)
{
//创建线程失败?认为这是个致命错误,会终止程序的运行
LOG_FATAL << "pthread_create false return err " << iRet;
}
LOG_INFO << "create thread : " << m_strThreadName << ",tid = " << m_tId;
}

int CThread::join()
{
m_Mutex.lock();
if (m_Status == JOIN)
{
m_Mutex.unlock();
//重复的调用join,这里需要向日志系统输出错误信息。
LOG_ERROR << "repeat call pthread_join";
return -1;
}
m_Mutex.unlock();
LOG_INFO << "join thread, tid = " << m_tId;
int iRet = ::pthread_join(m_tId, NULL);
m_Mutex.lock();
m_Status = JOIN;
m_Mutex.unlock();

return iRet;
}

void CThread::exit(void *rval_ptr)
{
::pthread_exit(rval_ptr);
}

const std::string &CThread::name(void)
{
CMutexLockPart lock(m_Mutex);
return m_strThreadName;
}

void CThread::setname(std::string &strName)
{
CMutexLockPart lock(m_Mutex);
m_strThreadName = strName;
}

pthread_t CThread::getSelfId()
{
return ::pthread_self();
}

CThread::ThreadStatus CThread::getThreadStatus()
{
CMutexLockPart lock(m_Mutex);
return m_Status;
}
6.ThreadPool.h

#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__

#include <boost/ptr_container/ptr_vector.hpp>
#include "AbstractThreadPool.h"
#include "BlockingQueue.h"
#include "Thread.h"
#include "TaskFuncExecute.h"

//BlockingQueue由使用者申请,在析构程池的时候释放
class CThreadPool	: public CAbstractThreadPool
{
public:

CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueue<Task> *pQueue);
~CThreadPool();

void execute(const Task &task);
void shutdown();

private:
void allWorker(const Task &task);
bool addBlockingQueue(const Task &task);
void runInWorkerThread();
Task getTask();

enum ThreadPoolStatus {RUNNING, SHUTDOWN};
ThreadPoolStatus m_eStatue;
int m_iCorePoolSize;		//核心池的大小
int m_iMaximumPoolSize;		//最大的大小
int m_iKeepAliveTime;		//空闲线程等待新任务的最长时间.0表示不使用
int m_iCurrentThreadSum;	//当前的工作线程
int m_iIdleThreadSum;		//空闲的工作线程
CMutexLock m_MutexLock;
CCondition m_Condition;
CBlockingQueue<Task> *m_pQueue;
boost::ptr_vector<CThread> m_vecWorker;	//所有的工作线程的集合
};

#endif	//#ifndef __THREAD_POOL_H__
7.ThreadPool.cpp

#include <boost/bind.hpp>
#include "ThreadPool.h"
#include "Exception.h"
#include "Logger.h"

using namespace Exception;
using namespace Log;

CThreadPool::CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueue<Task> *pQueue)
:m_eStatue(RUNNING)
,m_iCorePoolSize(corePoolSize)
,m_iMaximumPoolSize(maximumPoolSize)
,m_iKeepAliveTime(keepAliveTime)
,m_iCurrentThreadSum(0)
,m_iIdleThreadSum(0)
,m_MutexLock()
,m_Condition(m_MutexLock)
,m_pQueue(pQueue)
,m_vecWorker()
{
m_vecWorker.clear();
}

CThreadPool::~CThreadPool()
{
m_vecWorker.erase(m_vecWorker.begin(), m_vecWorker.end());
//队列由工厂类创建,这里释放掉
if (m_pQueue != NULL)
delete m_pQueue;	//这里需要释放队列
}

void CThreadPool::execute(const Task &task)
{
//为了简化实现,这里没有提供可选的策略。后续有时间可完善
try
{
if (m_eStatue == RUNNING)
{
m_MutexLock.lock();
if (m_iCorePoolSize == 0 || m_iCurrentThreadSum < m_iCorePoolSize)
{
if (m_iIdleThreadSum != 0)
{
m_MutexLock.unlock();
//有空闲的工作线程,直接放到队列去
(void)addBlockingQueue(task);
}
else
{
m_MutexLock.unlock();
//继续分配工作线程
allWorker(task);
}
}
else
{
if (m_pQueue && m_pQueue->full())
{
m_MutexLock.unlock();
allWorker(task);
}
else
{
m_MutexLock.unlock();
(void)addBlockingQueue(task);
}
}
}
else
{
//线程池处于SHUTDOWN状态,此时如果在提交任务,
//我们会抛出异常,这就是我们使用的默认策略
m_MutexLock.unlock();
throw CException("ThreadPool status SHUTDOWN!");
}
}
catch (const CException &ex)
{
LOG_ERROR << "Throw exception : " << ex.what();
}
}

void CThreadPool::shutdown()
{
m_MutexLock.lock();
m_eStatue = SHUTDOWN;
m_Condition.broadcast();
m_MutexLock.unlock();

for (boost::ptr_vector<CThread>::iterator iter = m_vecWorker.begin();
iter != m_vecWorker.end(); ++iter)
{
iter->join();
}
}

void CThreadPool::runInWorkerThread()
{
try
{
for(;;)
{
m_MutexLock .lock();
if (m_eStatue == SHUTDOWN && m_pQueue && m_pQueue->empty())
{
m_MutexLock.unlock();
return;
}
m_MutexLock.unlock();
Task task = getTask();
if (task == NULL)
return;

task();				//执行任务
}
}
catch (const CException &ex)
{
LOG_ERROR << "runInWorkerThread throw exeception " << ex.what();
}
catch (...)
{
LOG_ERROR << "runInWorkerThread unknow error";
}
}

Task CThreadPool::getTask()
{
CMutexLockPart lock(m_MutexLock);
while (m_pQueue && m_pQueue->empty() && m_eStatue == RUNNING)
{
++m_iIdleThreadSum;
if (m_iKeepAliveTime == 0 || (m_iCorePoolSize && m_iCurrentThreadSum < m_iCorePoolSize))
{
m_Condition.wait();
}
else
{
bool bRet = m_Condition.waitForSeconds(m_iKeepAliveTime);
if (bRet == true)
{
--m_iIdleThreadSum;
return NULL;
}
}
--m_iIdleThreadSum;
}

if (m_pQueue->empty() && m_eStatue == SHUTDOWN)
return NULL;

Task task = m_pQueue->poll();

if (m_pQueue && (m_pQueue->size() > 0))
m_Condition.signal();

return task;
}

void CThreadPool::allWorker(const Task &task)
{
try
{
CMutexLockPart lock(m_MutexLock);

if (m_iCurrentThreadSum >= m_iMaximumPoolSize)
throw CException("Current Threads out of Max Threads");

CThread *pWorkerThread = new CThread(boost::bind(&CThreadPool::runInWorkerThread, this), "Worker Thread");
pWorkerThread->start(task);
m_vecWorker.push_back(pWorkerThread);
++m_iCurrentThreadSum;
}
catch (const CException &ex)
{
LOG_ERROR << "ThreadPool allWorker throw exception " << ex.what();
}
}

bool CThreadPool::addBlockingQueue(const Task &task)
{
try
{
LOG_INFO << "addBlockingQueue";
CMutexLockPart lock(m_MutexLock);
if (m_pQueue && m_pQueue->full())
return false;

if (m_pQueue && !m_pQueue->offer(task))
throw CException("ThreadPool add BlockingQueue false");

m_Condition.signal();
return true;
}
catch (const CException &ex)
{
//这种情况应该是不会出现的,如果出现了,进行日志记录。继续运行
LOG_ERROR << "ThreadPool addBlockingQueue throw exception " << ex.what();
return true;
}
}
8.mainFixedThreadPool.cpp
#include "FactoryThreadPool.h"
#include "Logger.h"
#include <stdio.h>
#include <unistd.h>
#include <boost/bind.hpp>

using namespace Log;

//一个计算两个数之和的任务
void Add(int iValue1, int iValue2)
{
LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;
//sleep(1);
}

//这是固定大小线程池的测试代码,这里我们创建了一个固定大小为10的
//线程池,该线程池用来执行计算两数之和。
int main(void)
{
//日志的报警等级为INFO
CLogger::setLogLevel(CLogger::DEBUG);
//CLogger::setLogLevel(CLogger::INFO);
//我们设置日志输出到文件
CLogger::setOutputMode(LOGGER_MODE_LOGFILE);

LOG_DEBUG << "---------------------start-------------------";

LOG_DEBUG << "FixedThreadPool test start!";

CFactoryThreadPool *pFactory = new CFactoryThreadPool();
CAbstractThreadPool *pThreadPool = pFactory->newFixedThreadPool(10);

//向线程池提交200个加法运算的任务
for (int i = 0; i < 200; i++)
{
pThreadPool->execute(boost::bind(Add, i, i));
}

pThreadPool->shutdown();

delete pThreadPool;
delete pFactory;

LOG_DEBUG << "----------------------end--------------------";

return 0;
}
9.mainCachedThreadPool.cpp

#include "FactoryThreadPool.h"
#include "Logger.h"
#include <stdio.h>
#include <unistd.h>
#include <boost/bind.hpp>

using namespace Log;

//一个计算两个数之和的任务
void Add(int iValue1, int iValue2)
{
LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;
sleep(1);
}

//这是可缓存线程池的测试代码,这里我们创建了一个固定大小为10的
//线程池,该线程池用来执行计算两数之和。
int main(void)
{
//日志的报警等级为INFO
CLogger::setLogLevel(CLogger::DEBUG);
//CLogger::setLogLevel(CLogger::INFO);
//我们设置日志输出到文件
CLogger::setOutputMode(LOGGER_MODE_LOGFILE);

LOG_DEBUG << "---------------------start-------------------";

LOG_DEBUG << "newCachedThreadPool test start!";

CFactoryThreadPool *pFactory = new CFactoryThreadPool();
CAbstractThreadPool *pThreadPool = pFactory->newCachedThreadPool();

//向线程池提交200个加法运算的任务
for (int i = 0; i < 200; i++)
{
pThreadPool->execute(boost::bind(Add, i, i));
}

pThreadPool->shutdown();

delete pThreadPool;
delete pFactory;

LOG_DEBUG << "----------------------end--------------------";

return 0;
}
10.mainSingleThreadPool.cpp

#include "FactoryThreadPool.h"
#include "Logger.h"
#include <stdio.h>
#include <unistd.h>
#include <boost/bind.hpp>

using namespace Log;

//一个计算两个数之和的任务
void Add(int iValue1, int iValue2)
{
LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;
sleep(1);
}

//这是固定大小线程池的测试代码,这里我们创建了一个固定大小为10的
//线程池,该线程池用来执行计算两数之和。
int main(void)
{
//日志的报警等级为INFO
CLogger::setLogLevel(CLogger::DEBUG);
//CLogger::setLogLevel(CLogger::INFO);
//我们设置日志输出到文件
CLogger::setOutputMode(LOGGER_MODE_LOGFILE);

LOG_DEBUG << "---------------------start-------------------";

LOG_DEBUG << "SingleThreadPool test start!";

CFactoryThreadPool *pFactory = new CFactoryThreadPool();
CAbstractThreadPool *pThreadPool = pFactory->newSingleThreadExecutor();

//向线程池提交200个加法运算的任务
for (int i = 0; i < 200; i++)
{
pThreadPool->execute(boost::bind(Add, i, i));
}

pThreadPool->shutdown();

delete pThreadPool;
delete pFactory;

LOG_DEBUG << "----------------------end--------------------";

return 0;
}
11.mainThreadPool.cpp

#include "ThreadPool.h"
#include "Logger.h"
#include "LinkedBlockingQueue.h"
#include <stdio.h>
#include <unistd.h>
#include <boost/bind.hpp>

using namespace Log;

//一个计算两个数之和的任务
void Add(int iValue1, int iValue2)
{
LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;
sleep(2);
}

//这是线程池的测试代码,这里我们创建了一个大小为2的
//线程池,该线程池最大允许的线程数为4,
//该线程池用来执行计算两数之和。
int main(void)
{
//日志的报警等级为INFO
CLogger::setLogLevel(CLogger::DEBUG);
//CLogger::setLogLevel(CLogger::INFO);
//我们设置日志输出到文件
CLogger::setOutputMode(LOGGER_MODE_LOGFILE);

LOG_DEBUG << "---------------------start-------------------";

LOG_DEBUG << "CommonThreadPool test start!";

CThreadPool *pThreadPool = new CThreadPool(2, 4, 30, new CLinkedBlockingQueue<Task>(10));

//向线程池提交200个加法运算的任务
for (int i = 0; i < 15; i++)
{
pThreadPool->execute(boost::bind(Add, i, i));
}

pThreadPool->shutdown();

delete pThreadPool;

LOG_DEBUG << "----------------------end--------------------";

return 0;
}

我们共给出了4个测试代码,包括三个工厂类创建 的线程池和直接只有CThreadPool创建的线程池的测试。这里没有贴出具体的测试结果。因为测试结果输出是比较多的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: