您的位置:首页 > 编程语言 > C语言/C++

基于C++11实现的线程池

2017-04-06 20:07 801 查看
1.C++11中引入了lambada表达式,很好的支持异步编程

2.C++11中引入了std::thread,可以很方便的构建线程,更方便的可移植特性

3.C++11中引入了std::mutex,可以很方便的构建线程锁互斥访问,更方便的可移植特性

4.C++11中引入了std::condition_variable,可以不依赖于win32 api实现自己的消费者生产者模型

5.利用改进版本的shared_ptr,可以很好的解决多线程生命周期的棘手问题

/************************************************************************/
/*                                                                      */
/************************************************************************/

#ifndef __CARBON_THREAD_POOL_H
#define __CARBON_THREAD_POOL_H

#include <vector>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <string>
#include <sstream>
#include <deque>

namespace CARBON {

//************************************
// Method:    Create
// Returns:   std::shared_ptr
// Qualifier: 用于创建智能指针实例
// Parameter: args, 可变参数,接受任意个数的参数,传递给T的构造函数
//************************************
template<typename T, typename... ARG>
std::shared_ptr<T> Create(ARG&&... args)
{
struct TEnableShared : public T
{
TEnableShared(ARG&&... args)
: T(std::forward<ARG>(args)...)
{}
};

return std::make_shared<TEnableShared>(std::forward<ARG>(args)...);
}

class ThreadPool : public std::enable_shared_from_this<ThreadPool>
{
protected:
ThreadPool()
: _stop(false)
{}

virtual ~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(_lock);
_stop = true;
}
_condition.notify_all();
for (std::thread &worker : _workers)
worker.join();
}

public:
// initialize thread pool with number of threads
bool InitializePool(size_t threads)
{
if (!_workers.empty()) return true;

for (size_t i = 0; i < threads; ++i)
{
std::weak_ptr<ThreadPool> _wtp = this->shared_from_this();
auto th = [](std::weak_ptr<ThreadPool> wtp) {
for (;;)
{
std::function<void()> task;

{
std::shared_ptr<ThreadPool> stp = wtp.lock();
if (!stp)
return;

std::unique_lock<std::mutex> lock(stp->_lock);
auto shipment = [&] ()->bool { return stp->_stop || !stp->_tasks.empty(); };
stp->_condition.wait(lock, shipment);
if (stp->_stop)
return;
if (stp->_tasks.empty())
continue;
task = std::move(stp->_tasks.front()).task;
stp->_tasks.pop_front();
}

task();
}
};
_workers.emplace_back(th, _wtp);
}

return !_workers.empty();
}

//************************************
// Method:    EnqueueTask
// Returns:   std::future, 值类型由functor f指定
// Qualifier: 可以借由返回的std::future获取结果,但是更建议在functor中做异步通知
// Parameter: taskid 用于接受任务的id描述
// Parameter: functor f, 函数对象,用于执行任务
// Parameter: args, 可变参数,接受任意个数的参数,传递给functor f
//************************************
template<class F, class... Args>
auto EnqueueTask(std::string& taskid, F&& f, Args&&... args)
->std::future<typename std::result_of<F(Args...)>::type>
{
if (_workers.empty())
throw std::runtime_error("ThreadPool not initialized yet");

using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(_lock);

// don't allow enqueueing after stopping the pool
if (_stop)
throw std::runtime_error("enqueue on stopped ThreadPool");

stThreadTask st;
std::stringstream ss;
ss << (void*)task.get();
ss >> taskid;
st.taskid = taskid;
st.task = [task]() { (*task)(); };
_tasks.push_back(st);
}
_condition.notify_one();
return res;
}

//************************************
// Method:    GetTasksSize
// Returns:   size_t
// Qualifier: 获取等待任务队列的任务数,正在执行的任务已经弹出队列,所以不参与计算
//************************************
size_t GetTasksSize()
{
std::unique_lock<std::mutex> lock(_lock);
return _tasks.size();
}

//************************************
// Method:    RemoveTask
// Returns:   bool, 找到任务并移除则返回true,否则返回false
// Qualifier: 正在执行的任务已经弹出任务队列,应该在其它地方通知任务退出
// Qualifier: 执行完成的任务已经弹出任务队列,无法移除不存在的任务
// Qualifier: 该接口只能移除处在等待中的任务
// Parameter: taskid是任务的唯一标示,由EnqueueTask返回
//************************************
bool RemoveTask(const std::string& taskid)
{
std::unique_lock<std::mutex> lock(_lock);
for (auto& t = _tasks.begin(); t != _tasks.end(); ++t)
{
if (taskid == t->taskid)
{
_tasks.erase(t);
return true;
}
}

return false;
}

private:
typedef struct stThreadTask
{
std::function<void()> task;
std::string taskid;
}stThreadTask;

// need to keep track of threads so we can join them
std::vector< std::thread > _workers;
// the task queue
std::deque< stThreadTask > _tasks;

// synchronization
std::mutex _lock;
std::condition_variable _condition;
bool _stop;
};
}

#endif


使用enable_shared_from_this来确保内部线程访问指针时,不会因为指针失效造成的非法访问

weak_ptr很好的保证了ThreadPool的生命周期安全性和实效性

由于使用了share_from_this,将初始化代码整体拿出来放到InitializePool中实现

ThreadPool的构造函数和析构函数声明为protected,用于保证外部不要直接生成ThreadPool实例

应该使用Create函数来生成ThreadPool实例

测试代码如下:

namespace {
std::condition_variable _exit_cv;
}

void func(int n)
{
std::cout << "func with n " << n << std::endl;
}

using CARBON::ThreadPool;

std::string taskid;
std::shared_ptr<ThreadPool> stp = CARBON::Create<ThreadPool>();
std::weak_ptr<ThreadPool> _wtp = stp;
stp->InitializePool(2);

stp->EnqueueTask(taskid, [](std::function<void(int)> cbf, std::weak_ptr<ThreadPool> wtp) ->int {
std::cout << "task1\n";

for (int i = 0; i < 5; ++i) {
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
if(_exit_cv.wait_for(lck, std::chrono::milliseconds(400)) == std::cv_status::no_timeout)
break;

if (cbf) cbf(i);
if (wtp.expired())
break;
}

return 5;
}, func, _wtp);


当需要中断线程执行时,应该在外部通知线程中的任务自行退出

例子中可以在主线程中这么做

_exit_cv.notify_all();


_exit_cv用于模拟sleep操作
func用于模拟任务结果的异步通知,这里为了省事使用了函数指针,实际工作中应该使用functor来传递,以保证生命周期的有效性
比如std::bind和shared_ptr一起构造的functor对象
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: