您的位置:首页 > 编程语言 > C语言/C++

基于C++11 thread 实现线程池

2015-08-31 10:39 846 查看
这里基于C++11 thread实现线程池,线程池不可拷贝。

1 nocopyable类

不可拷贝基类继承它,派生类不可拷贝,实现如下

//nocopyable.h

#ifndef NOCOPYABLE_H
#define NOCOPYABLE_H

namespace fivestar
{
class nocopyable
{
private:
nocopyable(const nocopyable& x) = delete;
nocopyable& operator=(const nocopyable&x) = delete;
public:
nocopyable() = default;
~nocopyable() = default;
};

}

#endif // NOCOPYABLE_H




2 ThreadPool类

//ThreadPool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <thread>
#include <mutex>
#include <functional>
#include <string>
#include <condition_variable>
#include <deque>
#include <vector>
#include <memory>

#include "nocopyable.h"
namespace fivestar
{
class ThreadPool:public nocopyable
{
public:
typedef std::function<void()> Task;

explicit ThreadPool(const std::string &name = std::string());
~ThreadPool();

void start(int numThreads);//设置线程数,创建numThreads个线程
void stop();//线程池结束
void run(const Task& f);//任务f在线程池中运行
void setMaxQueueSize(int maxSize) { _maxQueueSize = maxSize; }//设置任务队列可存放最大任务数

private:
bool isFull();//任务队列是否已满
void runInThread();//线程池中每个thread执行的function
Task take();//从任务队列中取出一个任务

std::mutex _mutex;
std::condition_variable _notEmpty;
std::condition_variable _notFull;
std::string _name;
std::vector<std::thread> _threads;
std::deque<Task> _queue;
size_t _maxQueueSize;
bool _running;
};
}

#endif // THREADPOOL_H


//ThreadPool.cpp

#include "ThreadPool.h"
#include <cassert>

using namespace fivestar;
using namespace std;
ThreadPool::ThreadPool(const string &name):
_name(name),
_maxQueueSize(0),
_running(false)
{

}

ThreadPool::~ThreadPool()
{
if(_running)
{
stop();
}
}

void ThreadPool::start(int numThreads)
{
assert(_threads.empty());
_running = true;
_threads.reserve(numThreads);

for(int i = 0;i < numThreads;++i)
{
_threads.push_back(thread(&ThreadPool::runInThread,this));
}
}

void ThreadPool::stop()
{
{
unique_lock<mutex>  lock(_mutex);
_running = false;
_notEmpty.notify_all();
}

for(size_t i = 0;i < _threads.size();++i)
{
_threads[i].join();
}
}

void ThreadPool::run(const Task &f)
{
if(_threads.empty())
{
f();
}
else
{
unique_lock<mutex> lock(_mutex);
while(isFull())
{
_notFull.wait(lock);
}

assert(!isFull());
_queue.push_back(f);
_notEmpty.notify_one();
}
}

ThreadPool::Task ThreadPool::take()
{
unique_lock<mutex> lock(_mutex);

while(_queue.empty() && _running)
{
_notEmpty.wait(lock);
}

Task task;
if(!_queue.empty())
{
task = _queue.front();
_queue.pop_front();

if(_maxQueueSize > 0)
{
_notFull.notify_one();
}
}
return task;
}

bool ThreadPool::isFull()
{
return _maxQueueSize > 0 && _queue.size() >= _maxQueueSize;
}

void ThreadPool::runInThread()
{
try
{
while(_running)
{
Task task = take();
if(task)
{
task();
}
}
}
catch (const exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", _name.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch(...)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", _name.c_str());
}
}


注意:

1 .为线程池添加任务之前一定要调用setMaxQueueSize,设置任务队列可存放的最大任务数,否则线程池退化为但线程

2 若不调用start创建线程,则线程池退化为单线程

3 测试代码

#include <iostream>
#include "ThreadPool.h"

using namespace std;

void Test(int i)
{
printf("I love you %d time\n",i);
}

int main()
{
fivestar::ThreadPool threadPool;
threadPool.setMaxQueueSize(10);
threadPool.start(2);

for(int i = 0;i < 10;++i)
{
auto task = bind(Test,i);
threadPool.run(task);
}

getchar();
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: