您的位置:首页 > 编程语言 > C语言/C++

C++线程池-ThreadPool

2017-10-22 11:40 567 查看
今天介绍一下线程池,线程池还是比较简单的,使用线程函数开辟几条线程,每条线程使用一个循环检测任务队列,当有任务时执行,这里为了避免CPU的空转,当有新任务加入队列时,通知一个线程去执行它。

这个线程池具有以下特点:

1、定时检测线程数量,关闭空闲的多余线程;

2、像最大线程数,保持线程数是可以在运行时改变的,虽然基本上没有这个必要;

3、这个线程池是单例的,我在使用时会在程序启动时创建它,然后获取其实例来使用。

这个线程池是在windows下使用boost库的线程函数创建线程的,在linux下可以使用pthread_create来创建线程。

#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include <string>
#include <list>
#include <boost/thread/thread.hpp>
#include "TimeWheel.h"//我之前写的时间轮定时器
#include "TempLock.h"//锁
#include "CondiTion.h"//信号量

typedef void(*HandleMsgCb)(std::string msgType, void* msgBody, int msgLen);//执行时使用的函数指针
typedef void(*exeCb)(void* arg);//任务的格式

//线程池存储消息的结构体
typedef struct ThreadPoolMsgHolder {
void* cb; //回调
void* arg;//参数
std::string msgType;
void* msgBody;
int msgLen; // -1 的时候为取消,-2为线程池,-3为消息队列
};
class ThreadPool
{
private:
static ThreadPool* threadPoolInstance; // 唯一实例

public:
ThreadPool(void);
ThreadPool(int initThreadNum, int holdThreadNum, int maxThreadNum);
~ThreadPool(void);
private:
TimeWheel timer;
TempLock tempLock;
CondiTion threadCond;

std::list<ThreadPoolMsgHolder*> msgHolders; // 消息载体
int curThreadNum; // 剩余的线程数量
int initThreadNum; // 初始化线程的数量
int holdThreadNum;// 保持的线程数量
int maxThreadNum; // 最大的线程数量
int idleThreadNum; // 空闲线程数量
HandleMsgCb handleMsgCb; // 处理消息的回调
int checkIdleTime; // 1分钟 1 * 1000
public:
void setInitThreadNum(int initThreadNum);
void setHoldThreadNum(int holdThreadNum);
void setMaxThreadNum(int maxThreadNum);
void setIdleThreadNum(int idleThreadNu
dde9
m);

int getCurThreadNum();
int getInitThreadNum();
int getHoldThreadNum();
int getMaxThreadNum();
int getIdleThreadNum();
void startMQ(HandleMsgCb cb);
void pushMsg(std::string msgType, void* msgBody, int msgLen); // 压人消息
void pushMsg(ThreadPoolMsgHolder* msgHolder); // 压人消息
ThreadPoolMsgHolder* popMsg(); // 弹出消息
int getTaskQueLen(); // 获取消息队列的长度
void executeInner(exeCb cb, void* arg); //  执行函数
void start(); // 开启线程池
private:
void addThread(); // 添加线程
void addCurThreadNum(); // 增加当前线程数量
void subCurThreadNum();// 减去当前线程数量
void startThread(); // 开启线程
int startTimerMonitor(); // 开启定时器检查
void StopThread(void* arg);
bool isStopExcessiveThread(); // 是否关闭过多的线程
bool isStopIdleThread();
public:
static void InitThreadPool(int initThreadNum = 1, int holdThreadNum = 1, int maxThreadNum = 1);
static void run(void* arg);
static void ThreadMonitor(void* arg);
static ThreadPool* getInstance();
static void execute(exeCb cb, void* arg);
};
#endif


#include "ThreadPool.h"
#include <windows.h>
#include <process.h>

ThreadPool* ThreadPool::threadPoolInstance = NULL;

ThreadPool::ThreadPool(void) :
initThreadNum(4) // 初始线程数量
, curThreadNum(0) // 当前线程数量
, holdThreadNum(10) // 空闲时保持的线程数量
, maxThreadNum(20) // 最大可开的线程数量
, idleThreadNum(0) // 当前空闲线程数量
, handleMsgCb(NULL) // 处理函数的回调
, checkIdleTime(10 * 60 * 1000) // 十分钟检查一次
{
}
ThreadPool::ThreadPool(int initThreadNum, int holdThreadNum, int maxThreadNum) :
initThreadNum(initThreadNum) // 初始线程数量
, curThreadNum(0) // 当前线程数量
, holdThreadNum(holdThreadNum) // 空闲时hold住的线程数量
, maxThreadNum(maxThreadNum) // 最大可开的线程数量
, idleThreadNum(0) // 当前空闲线程数量
, handleMsgCb(NULL) // 处理函数的回调
, checkIdleTime(10 * 60 * 1000) // 十分钟检查一次

{
}

ThreadPool::~ThreadPool(void)
{
}
//返回唯一实例的句柄
ThreadPool* ThreadPool::getInstance() {
return threadPoolInstance;
}
//几个设置参数的函数
void ThreadPool::setInitThreadNum(int initThreadNum) {
this->initThreadNum = initThreadNum;
}
void ThreadPool::setHoldThreadNum(int holdThreadNum) {
this->holdThreadNum = holdThreadNum;
}
void ThreadPool::setMaxThreadNum(int maxThreadNum) {
this->maxThreadNum = maxThreadNum;
}
//获取当前的线程数量
int ThreadPool::getCurThreadNum() {
tempLock.lock();
int curThreadNum = this->curThreadNum;
tempLock.unlock();
return curThreadNum;
}
//获取参数的一些函数
int ThreadPool::getInitThreadNum() {
return this->initThreadNum;
}
int ThreadPool::getHoldThreadNum() {
return this->holdThreadNum;
}
int ThreadPool::getMaxThreadNum() {
return this->maxThreadNum;
}
//获取空闲线程数量
int ThreadPool::getIdleThreadNum() {
tempLock.lock();
int idleThreadNum = this->idleThreadNum;
tempLock.unlock();
return idleThreadNum;
}
//
void ThreadPool::startMQ(HandleMsgCb cb){
tempLock.lock();
if (this->handleMsgCb == NULL)
{
this->handleMsgCb = cb;
startThread();
}
tempLock.unlock();
}

void ThreadPool::start() {
startMQ(NULL);
}
//开始运行
void ThreadPool::startThread() {
for (int i = 0; i < initThreadNum; i++)
{
addThread();//增加线程
}
}
int ThreadPool::startTimerMonitor() { // 开启定时器检查
return timer.setTimer(checkIdleTime, ThreadMonitor, this, NULL);
}
// 添加线程
void ThreadPool::addThread() {
++curThreadNum;
boost::thread(run, this);//使用boost库的函数开启一条线程

}
void ThreadPool::pushMsg(std::string msgType, void* msgBody, int msgLen) { // 压人消息
ThreadPoolMsgHolder* msgHolder = new ThreadPoolMsgHolder;
msgHolder->msgType = msgType;
msgHolder->msgBody = msgBody;
msgHolder->msgLen = msgLen;
pushMsg(msgHolder);//放入队列
}
void ThreadPool::pushMsg(ThreadPoolMsgHolder* msgHolder) {
tempLock.lock();
msgHolders.push_back(msgHolder);
if ((idleThreadNum == 0) && (curThreadNum < maxThreadNum))//判断可用线程,是否增加线程
{
addThread();
}
else
{
threadCond.notify_one(); //通知有任务可以执行啦
}
tempLock.unlock();
}
//获取任务结构体
ThreadPoolMsgHolder* ThreadPool::popMsg() {
tempLock.lock();
++idleThreadNum; // 空闲的线程
while (msgHolders.size() == 0) //  注意这里,如果任务队列一直是0
{
// 在这判断当前的线程是否大于最大的holder线程
if (idleThreadNum > holdThreadNum)
{
startTimerMonitor();//  监控空闲线程
}
threadCond.wait(&tempLock);//等待任务执行通知
}
--idleThreadNum; // 拿到消息就不空闲了
ThreadPoolMsgHolder *msgHolder = msgHolders.front();//获取第一个任务
msgHolders.pop_front();//获取
tempLock.unlock();
return msgHolder;//返回任务内容
}

int ThreadPool::getTaskQueLen(){ // 获取消息队列的长度
tempLock.lock();
int msgNum = this->msgHolders.size();
tempLock.unlock();
return msgNum;
}
bool ThreadPool::isStopExcessiveThread() { // 是否关闭过多的线程
return false;
}
void ThreadPool::addCurThreadNum(){ // 增加当前线程数量
tempLock.lock();
++curThreadNum;
tempLock.unlock();
}
void ThreadPool::subCurThreadNum(){// 减少当前线程数量
tempLock.lock();
--curThreadNum;
tempLock.unlock();
}
//线程的处理循环
void ThreadPool::run(void* arg) {
printf("创建线程----线程id = %d\n", GetCurrentThreadId());
ThreadPool* bpmq = (ThreadPool*)arg;
bool stopFlag = false;
while (!stopFlag)//判断线程是否结束
{
ThreadPoolMsgHolder* msgHolder = bpmq->popMsg(); // 从队列里取消息
switch (msgHolder->msgLen)//根据长度判断消息类型
{
case -1: // 关闭线程
{
bpmq->tempLock.lock();
if (bpmq->isStopIdleThread()) //返回是否关闭空闲线程的flag
{
bpmq->subCurThreadNum();
stopFlag = true;
printf("关闭线程=%d,剩余=%d\n", GetCurrentThreadId(), bpmq->curThreadNum);
}
bpmq->tempLock.unlock();
}
break;
case -2: //执行任务回调
((exeCb)msgHolder->cb)(msgHolder->arg); // 执行线程了
break;
default:
bpmq->handleMsgCb(msgHolder->msgType, msgHolder->msgBody, msgHolder->msgLen);
break;
}
delete msgHolder;//执行过后删除任务结构
}
}
//是否关闭空闲线程的flag
bool ThreadPool::isStopIdleThread() {
tempLock.lock();
bool isStop = idleThreadNum >= holdThreadNum;
tempLock.unlock();
return isStop;
}
//线程检查
void ThreadPool::ThreadMonitor(void* arg) {
ThreadPool* bpmq = (ThreadPool*)arg;
bpmq->pushMsg("exit", NULL, -1);//放入关闭线程的消息
}
//执行线程
void ThreadPool::executeInner(exeCb cb, void* arg) {
ThreadPoolMsgHolder* msgHolder = new ThreadPoolMsgHolder;
msgHolder->cb = cb;
msgHolder->arg = arg;
msgHolder->msgLen = -2;// 线程池执行码
pushMsg(msgHolder);//将任务放入队列
}
//执行任务
void ThreadPool::execute(exeCb cb, void* arg) {
getInstance()->executeInner(cb, arg);
}
//初始化线程池
void ThreadPool::InitThreadPool(int initThreadNum, int holdThreadNum, int maxThreadNum) {
threadPoolInstance = new ThreadPool(initThreadNum, holdThreadNum, maxThreadNum);

threadPoolInstance->startThread();//开始运行
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: