C++线程池-ThreadPool
2017-10-22 11:40
567 查看
今天介绍一下线程池,线程池还是比较简单的,使用线程函数开辟几条线程,每条线程使用一个循环检测任务队列,当有任务时执行,这里为了避免CPU的空转,当有新任务加入队列时,通知一个线程去执行它。
这个线程池具有以下特点:
1、定时检测线程数量,关闭空闲的多余线程;
2、像最大线程数,保持线程数是可以在运行时改变的,虽然基本上没有这个必要;
3、这个线程池是单例的,我在使用时会在程序启动时创建它,然后获取其实例来使用。
这个线程池是在windows下使用boost库的线程函数创建线程的,在linux下可以使用pthread_create来创建线程。
这个线程池具有以下特点:
1、定时检测线程数量,关闭空闲的多余线程;
2、像最大线程数,保持线程数是可以在运行时改变的,虽然基本上没有这个必要;
3、这个线程池是单例的,我在使用时会在程序启动时创建它,然后获取其实例来使用。
这个线程池是在windows下使用boost库的线程函数创建线程的,在linux下可以使用pthread_create来创建线程。
#ifndef __THREADPOOL_H__ #define __THREADPOOL_H__ #include <string> #include <list> #include <boost/thread/thread.hpp> #include "TimeWheel.h"//我之前写的时间轮定时器 #include "TempLock.h"//锁 #include "CondiTion.h"//信号量 typedef void(*HandleMsgCb)(std::string msgType, void* msgBody, int msgLen);//执行时使用的函数指针 typedef void(*exeCb)(void* arg);//任务的格式 //线程池存储消息的结构体 typedef struct ThreadPoolMsgHolder { void* cb; //回调 void* arg;//参数 std::string msgType; void* msgBody; int msgLen; // -1 的时候为取消,-2为线程池,-3为消息队列 }; class ThreadPool { private: static ThreadPool* threadPoolInstance; // 唯一实例 public: ThreadPool(void); ThreadPool(int initThreadNum, int holdThreadNum, int maxThreadNum); ~ThreadPool(void); private: TimeWheel timer; TempLock tempLock; CondiTion threadCond; std::list<ThreadPoolMsgHolder*> msgHolders; // 消息载体 int curThreadNum; // 剩余的线程数量 int initThreadNum; // 初始化线程的数量 int holdThreadNum;// 保持的线程数量 int maxThreadNum; // 最大的线程数量 int idleThreadNum; // 空闲线程数量 HandleMsgCb handleMsgCb; // 处理消息的回调 int checkIdleTime; // 1分钟 1 * 1000 public: void setInitThreadNum(int initThreadNum); void setHoldThreadNum(int holdThreadNum); void setMaxThreadNum(int maxThreadNum); void setIdleThreadNum(int idleThreadNu dde9 m); int getCurThreadNum(); int getInitThreadNum(); int getHoldThreadNum(); int getMaxThreadNum(); int getIdleThreadNum(); void startMQ(HandleMsgCb cb); void pushMsg(std::string msgType, void* msgBody, int msgLen); // 压人消息 void pushMsg(ThreadPoolMsgHolder* msgHolder); // 压人消息 ThreadPoolMsgHolder* popMsg(); // 弹出消息 int getTaskQueLen(); // 获取消息队列的长度 void executeInner(exeCb cb, void* arg); // 执行函数 void start(); // 开启线程池 private: void addThread(); // 添加线程 void addCurThreadNum(); // 增加当前线程数量 void subCurThreadNum();// 减去当前线程数量 void startThread(); // 开启线程 int startTimerMonitor(); // 开启定时器检查 void StopThread(void* arg); bool isStopExcessiveThread(); // 是否关闭过多的线程 bool isStopIdleThread(); public: static void InitThreadPool(int initThreadNum = 1, int holdThreadNum = 1, int maxThreadNum = 1); static void run(void* arg); static void ThreadMonitor(void* arg); static ThreadPool* getInstance(); static void execute(exeCb cb, void* arg); }; #endif
#include "ThreadPool.h" #include <windows.h> #include <process.h> ThreadPool* ThreadPool::threadPoolInstance = NULL; ThreadPool::ThreadPool(void) : initThreadNum(4) // 初始线程数量 , curThreadNum(0) // 当前线程数量 , holdThreadNum(10) // 空闲时保持的线程数量 , maxThreadNum(20) // 最大可开的线程数量 , idleThreadNum(0) // 当前空闲线程数量 , handleMsgCb(NULL) // 处理函数的回调 , checkIdleTime(10 * 60 * 1000) // 十分钟检查一次 { } ThreadPool::ThreadPool(int initThreadNum, int holdThreadNum, int maxThreadNum) : initThreadNum(initThreadNum) // 初始线程数量 , curThreadNum(0) // 当前线程数量 , holdThreadNum(holdThreadNum) // 空闲时hold住的线程数量 , maxThreadNum(maxThreadNum) // 最大可开的线程数量 , idleThreadNum(0) // 当前空闲线程数量 , handleMsgCb(NULL) // 处理函数的回调 , checkIdleTime(10 * 60 * 1000) // 十分钟检查一次 { } ThreadPool::~ThreadPool(void) { } //返回唯一实例的句柄 ThreadPool* ThreadPool::getInstance() { return threadPoolInstance; } //几个设置参数的函数 void ThreadPool::setInitThreadNum(int initThreadNum) { this->initThreadNum = initThreadNum; } void ThreadPool::setHoldThreadNum(int holdThreadNum) { this->holdThreadNum = holdThreadNum; } void ThreadPool::setMaxThreadNum(int maxThreadNum) { this->maxThreadNum = maxThreadNum; } //获取当前的线程数量 int ThreadPool::getCurThreadNum() { tempLock.lock(); int curThreadNum = this->curThreadNum; tempLock.unlock(); return curThreadNum; } //获取参数的一些函数 int ThreadPool::getInitThreadNum() { return this->initThreadNum; } int ThreadPool::getHoldThreadNum() { return this->holdThreadNum; } int ThreadPool::getMaxThreadNum() { return this->maxThreadNum; } //获取空闲线程数量 int ThreadPool::getIdleThreadNum() { tempLock.lock(); int idleThreadNum = this->idleThreadNum; tempLock.unlock(); return idleThreadNum; } // void ThreadPool::startMQ(HandleMsgCb cb){ tempLock.lock(); if (this->handleMsgCb == NULL) { this->handleMsgCb = cb; startThread(); } tempLock.unlock(); } void ThreadPool::start() { startMQ(NULL); } //开始运行 void ThreadPool::startThread() { for (int i = 0; i < initThreadNum; i++) { addThread();//增加线程 } } int ThreadPool::startTimerMonitor() { // 开启定时器检查 return timer.setTimer(checkIdleTime, ThreadMonitor, this, NULL); } // 添加线程 void ThreadPool::addThread() { ++curThreadNum; boost::thread(run, this);//使用boost库的函数开启一条线程 } void ThreadPool::pushMsg(std::string msgType, void* msgBody, int msgLen) { // 压人消息 ThreadPoolMsgHolder* msgHolder = new ThreadPoolMsgHolder; msgHolder->msgType = msgType; msgHolder->msgBody = msgBody; msgHolder->msgLen = msgLen; pushMsg(msgHolder);//放入队列 } void ThreadPool::pushMsg(ThreadPoolMsgHolder* msgHolder) { tempLock.lock(); msgHolders.push_back(msgHolder); if ((idleThreadNum == 0) && (curThreadNum < maxThreadNum))//判断可用线程,是否增加线程 { addThread(); } else { threadCond.notify_one(); //通知有任务可以执行啦 } tempLock.unlock(); } //获取任务结构体 ThreadPoolMsgHolder* ThreadPool::popMsg() { tempLock.lock(); ++idleThreadNum; // 空闲的线程 while (msgHolders.size() == 0) // 注意这里,如果任务队列一直是0 { // 在这判断当前的线程是否大于最大的holder线程 if (idleThreadNum > holdThreadNum) { startTimerMonitor();// 监控空闲线程 } threadCond.wait(&tempLock);//等待任务执行通知 } --idleThreadNum; // 拿到消息就不空闲了 ThreadPoolMsgHolder *msgHolder = msgHolders.front();//获取第一个任务 msgHolders.pop_front();//获取 tempLock.unlock(); return msgHolder;//返回任务内容 } int ThreadPool::getTaskQueLen(){ // 获取消息队列的长度 tempLock.lock(); int msgNum = this->msgHolders.size(); tempLock.unlock(); return msgNum; } bool ThreadPool::isStopExcessiveThread() { // 是否关闭过多的线程 return false; } void ThreadPool::addCurThreadNum(){ // 增加当前线程数量 tempLock.lock(); ++curThreadNum; tempLock.unlock(); } void ThreadPool::subCurThreadNum(){// 减少当前线程数量 tempLock.lock(); --curThreadNum; tempLock.unlock(); } //线程的处理循环 void ThreadPool::run(void* arg) { printf("创建线程----线程id = %d\n", GetCurrentThreadId()); ThreadPool* bpmq = (ThreadPool*)arg; bool stopFlag = false; while (!stopFlag)//判断线程是否结束 { ThreadPoolMsgHolder* msgHolder = bpmq->popMsg(); // 从队列里取消息 switch (msgHolder->msgLen)//根据长度判断消息类型 { case -1: // 关闭线程 { bpmq->tempLock.lock(); if (bpmq->isStopIdleThread()) //返回是否关闭空闲线程的flag { bpmq->subCurThreadNum(); stopFlag = true; printf("关闭线程=%d,剩余=%d\n", GetCurrentThreadId(), bpmq->curThreadNum); } bpmq->tempLock.unlock(); } break; case -2: //执行任务回调 ((exeCb)msgHolder->cb)(msgHolder->arg); // 执行线程了 break; default: bpmq->handleMsgCb(msgHolder->msgType, msgHolder->msgBody, msgHolder->msgLen); break; } delete msgHolder;//执行过后删除任务结构 } } //是否关闭空闲线程的flag bool ThreadPool::isStopIdleThread() { tempLock.lock(); bool isStop = idleThreadNum >= holdThreadNum; tempLock.unlock(); return isStop; } //线程检查 void ThreadPool::ThreadMonitor(void* arg) { ThreadPool* bpmq = (ThreadPool*)arg; bpmq->pushMsg("exit", NULL, -1);//放入关闭线程的消息 } //执行线程 void ThreadPool::executeInner(exeCb cb, void* arg) { ThreadPoolMsgHolder* msgHolder = new ThreadPoolMsgHolder; msgHolder->cb = cb; msgHolder->arg = arg; msgHolder->msgLen = -2;// 线程池执行码 pushMsg(msgHolder);//将任务放入队列 } //执行任务 void ThreadPool::execute(exeCb cb, void* arg) { getInstance()->executeInner(cb, arg); } //初始化线程池 void ThreadPool::InitThreadPool(int initThreadNum, int holdThreadNum, int maxThreadNum) { threadPoolInstance = new ThreadPool(initThreadNum, holdThreadNum, maxThreadNum); threadPoolInstance->startThread();//开始运行 }
相关文章推荐
- 在linux下使用c++线程池threadpool
- C++ 简单的线程池 thread_pool
- 在linux下使用c++线程池threadpool
- 在linux下使用c++线程池threadpool
- 开源线程池组件SmartThreadPool
- 实现Spring整合线程池ThreadPoolTaskExecutor
- Java线程2-4 单任务线程池SingleThreadPool
- newScheduledThreadPool延时任务线程池,实现原理
- Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现
- 高效线程池(threadpool)的实现
- [深入学习C#]C#实现多线程的方法:线程(Thread类)和线程池(ThreadPool)
- 【源码剖析】threadpool —— 基于 pthread 实现的简单线程池
- 多线程系列(2)线程池ThreadPool
- Spring ThreadPoolTaskExecutor线程池
- ThreadPool(线程池)
- Java-线程池 ThreadPool 专题详解 (美团面试题)
- c++ threadpool.cpp
- 线程池--jetty中QueuedThreadPool分析(一)
- 一个线程池(ThreadPool)的使用
- C#多线程编程(3)-线程池ThreadPool