您的位置:首页 > 编程语言 > C语言/C++

C++ 线程池的简易实现

2015-08-30 14:57 585 查看
首先,先简单介绍,线程池的工作原理。

1.他自身拥有一定数量的线程数组 threads,处于等待状态,等待唤醒(通过条件变量)

2.拥有一个任务队列 m_tasks,存储用户的任务,有新任务以后,唤醒线程,取出任务,通过回调函数的方式调用任务,执行完以后继续等待。

使用情况:线程池,适用于会话简短的情况下,http访问可以使用线程池,如需要长时间保持通讯的,如会话,就不要用线程池了。

本例子,采用单例模式,线程安全。

公开接口两个:

static CMyThreadPool * getInstance();
bool start(Task fun);

用户的函数 fun 的参数,可通过,bind来传递,不过要注意,如果传的是指针,需要注意他的生存周期,如果传的是 new,处理完以后,要自己 delete.

void showTicket(mutex* m){
lock_guard<std::mutex> l(*m);
cout <<" show ticket: " << ticket++ << endl;

}

pool->start(bind(showTicket, m));


头文件:

//定义一个函数对象类型
typedef std::function<void()> Task;

class CMyThreadPool
{
private:
int max_thread;                // max thread;
int max_task;                // max task;
// thread array:
vector<thread> threads;
// task queue:
queue<Task> m_tasks;
// lock:
mutex m_lock;
// condition:
condition_variable has_task;
bool running_flag;
public:
~CMyThreadPool(void);
  //获取线程池对象指针
static CMyThreadPool * getInstance();
  //添加任务,成功返回true,失败返回false
bool start(Task fun);
private:
CMyThreadPool(void);
bool InitThread();
void DestroyPool();
  //工作线程
void WorkFun();
static CMyThreadPool * m_pool;
static std::mutex *singal_mutex;
};


实现:

#include "MyThreadPool.h"

CMyThreadPool * CMyThreadPool::m_pool = NULL;
mutex* CMyThreadPool::singal_mutex = new mutex();

CMyThreadPool::CMyThreadPool(void):max_thread(default_max_thread),
max_task(default_max_task),running_flag(true)
{
}

CMyThreadPool::~CMyThreadPool(void)
{
DestroyPool();
}

CMyThreadPool * CMyThreadPool::getInstance()
{
if( NULL == m_pool){
//lock();
std::lock_guard<std::mutex> l(*singal_mutex);
if( NULL == m_pool){
m_pool = new CMyThreadPool();
}
//unlock();
}
return m_pool;
}

bool CMyThreadPool::start( Task fun )
{
//判断是否第一次,延缓线程初始化
{
if( threads.size() == 0){
unique_lock<mutex> l(m_lock);
if( threads.size() == 0){
//初始化线程
if(!InitThread()){
return false;
}
}
}
}
//判断工作队列是否已满,没满则加入工作队列
{
unique_lock<mutex> l(m_lock);
if( (unsigned int)max_task > m_tasks.size()){
m_tasks.push(fun);
}else{
return false;
}
}
//唤醒一个线程
has_task.notify_one();
return true;
}

//已经上着锁了
bool CMyThreadPool::InitThread()
{
for (int i = 0; i != max_thread; i++){
threads.push_back(thread(&CMyThreadPool::WorkFun, this));
}
return true;
}

void CMyThreadPool::WorkFun()
{
while(running_flag || !m_tasks.empty()){
Task t;
//获取task
{
unique_lock<mutex> l(m_lock);
while( m_tasks.empty())
has_task.wait(l);
t = m_tasks.front();
m_tasks.pop();
}
//执行task
t();
}
}

void CMyThreadPool::DestroyPool()
{
{
unique_lock<mutex> u_lock(m_lock);
running_flag = false;
}
has_task.notify_all();

for( auto &t : threads){
t.join();
}
threads.clear();
}


测试用例:

#include <iostream>
#include "MyThreadPool.h"
#include <memory>
#define _CRTDBG_MAP_ALLOC
#include <crtdbg.h>
#include <Windows.h>

using namespace std;
int ticket = 0;

void showTicket(mutex* m){
lock_guard<std::mutex> l(*m);
#ifdef WIN32
//打印当前线程号
cout << "Thread id: " << GetCurrentThreadId();

#endif
cout <<" show ticket: " << ticket++ << endl;

}

int main(){
mutex *m = new mutex;
int sum = 0;
{
std::shared_ptr<CMyThreadPool> pool(CMyThreadPool::getInstance());
for(int i = 0; i < 100;i++){
if(!pool->start(bind(showTicket, m))){
sum++;
}
}
}
cout << "not use task : "<< sum << endl;
delete m;
_CrtDumpMemoryLeaks();
system("pause");
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: