基于C++11实现的线程池
2017-04-06 20:07
801 查看
1.C++11中引入了lambada表达式,很好的支持异步编程
2.C++11中引入了std::thread,可以很方便的构建线程,更方便的可移植特性
3.C++11中引入了std::mutex,可以很方便的构建线程锁互斥访问,更方便的可移植特性
4.C++11中引入了std::condition_variable,可以不依赖于win32 api实现自己的消费者生产者模型
5.利用改进版本的shared_ptr,可以很好的解决多线程生命周期的棘手问题
使用enable_shared_from_this来确保内部线程访问指针时,不会因为指针失效造成的非法访问
weak_ptr很好的保证了ThreadPool的生命周期安全性和实效性
由于使用了share_from_this,将初始化代码整体拿出来放到InitializePool中实现
ThreadPool的构造函数和析构函数声明为protected,用于保证外部不要直接生成ThreadPool实例
应该使用Create函数来生成ThreadPool实例
测试代码如下:
当需要中断线程执行时,应该在外部通知线程中的任务自行退出
例子中可以在主线程中这么做
2.C++11中引入了std::thread,可以很方便的构建线程,更方便的可移植特性
3.C++11中引入了std::mutex,可以很方便的构建线程锁互斥访问,更方便的可移植特性
4.C++11中引入了std::condition_variable,可以不依赖于win32 api实现自己的消费者生产者模型
5.利用改进版本的shared_ptr,可以很好的解决多线程生命周期的棘手问题
/************************************************************************/ /* */ /************************************************************************/ #ifndef __CARBON_THREAD_POOL_H #define __CARBON_THREAD_POOL_H #include <vector> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> #include <string> #include <sstream> #include <deque> namespace CARBON { //************************************ // Method: Create // Returns: std::shared_ptr // Qualifier: 用于创建智能指针实例 // Parameter: args, 可变参数,接受任意个数的参数,传递给T的构造函数 //************************************ template<typename T, typename... ARG> std::shared_ptr<T> Create(ARG&&... args) { struct TEnableShared : public T { TEnableShared(ARG&&... args) : T(std::forward<ARG>(args)...) {} }; return std::make_shared<TEnableShared>(std::forward<ARG>(args)...); } class ThreadPool : public std::enable_shared_from_this<ThreadPool> { protected: ThreadPool() : _stop(false) {} virtual ~ThreadPool() { { std::unique_lock<std::mutex> lock(_lock); _stop = true; } _condition.notify_all(); for (std::thread &worker : _workers) worker.join(); } public: // initialize thread pool with number of threads bool InitializePool(size_t threads) { if (!_workers.empty()) return true; for (size_t i = 0; i < threads; ++i) { std::weak_ptr<ThreadPool> _wtp = this->shared_from_this(); auto th = [](std::weak_ptr<ThreadPool> wtp) { for (;;) { std::function<void()> task; { std::shared_ptr<ThreadPool> stp = wtp.lock(); if (!stp) return; std::unique_lock<std::mutex> lock(stp->_lock); auto shipment = [&] ()->bool { return stp->_stop || !stp->_tasks.empty(); }; stp->_condition.wait(lock, shipment); if (stp->_stop) return; if (stp->_tasks.empty()) continue; task = std::move(stp->_tasks.front()).task; stp->_tasks.pop_front(); } task(); } }; _workers.emplace_back(th, _wtp); } return !_workers.empty(); } //************************************ // Method: EnqueueTask // Returns: std::future, 值类型由functor f指定 // Qualifier: 可以借由返回的std::future获取结果,但是更建议在functor中做异步通知 // Parameter: taskid 用于接受任务的id描述 // Parameter: functor f, 函数对象,用于执行任务 // Parameter: args, 可变参数,接受任意个数的参数,传递给functor f //************************************ template<class F, class... Args> auto EnqueueTask(std::string& taskid, F&& f, Args&&... args) ->std::future<typename std::result_of<F(Args...)>::type> { if (_workers.empty()) throw std::runtime_error("ThreadPool not initialized yet"); using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(_lock); // don't allow enqueueing after stopping the pool if (_stop) throw std::runtime_error("enqueue on stopped ThreadPool"); stThreadTask st; std::stringstream ss; ss << (void*)task.get(); ss >> taskid; st.taskid = taskid; st.task = [task]() { (*task)(); }; _tasks.push_back(st); } _condition.notify_one(); return res; } //************************************ // Method: GetTasksSize // Returns: size_t // Qualifier: 获取等待任务队列的任务数,正在执行的任务已经弹出队列,所以不参与计算 //************************************ size_t GetTasksSize() { std::unique_lock<std::mutex> lock(_lock); return _tasks.size(); } //************************************ // Method: RemoveTask // Returns: bool, 找到任务并移除则返回true,否则返回false // Qualifier: 正在执行的任务已经弹出任务队列,应该在其它地方通知任务退出 // Qualifier: 执行完成的任务已经弹出任务队列,无法移除不存在的任务 // Qualifier: 该接口只能移除处在等待中的任务 // Parameter: taskid是任务的唯一标示,由EnqueueTask返回 //************************************ bool RemoveTask(const std::string& taskid) { std::unique_lock<std::mutex> lock(_lock); for (auto& t = _tasks.begin(); t != _tasks.end(); ++t) { if (taskid == t->taskid) { _tasks.erase(t); return true; } } return false; } private: typedef struct stThreadTask { std::function<void()> task; std::string taskid; }stThreadTask; // need to keep track of threads so we can join them std::vector< std::thread > _workers; // the task queue std::deque< stThreadTask > _tasks; // synchronization std::mutex _lock; std::condition_variable _condition; bool _stop; }; } #endif
使用enable_shared_from_this来确保内部线程访问指针时,不会因为指针失效造成的非法访问
weak_ptr很好的保证了ThreadPool的生命周期安全性和实效性
由于使用了share_from_this,将初始化代码整体拿出来放到InitializePool中实现
ThreadPool的构造函数和析构函数声明为protected,用于保证外部不要直接生成ThreadPool实例
应该使用Create函数来生成ThreadPool实例
测试代码如下:
namespace { std::condition_variable _exit_cv; } void func(int n) { std::cout << "func with n " << n << std::endl; } using CARBON::ThreadPool; std::string taskid; std::shared_ptr<ThreadPool> stp = CARBON::Create<ThreadPool>(); std::weak_ptr<ThreadPool> _wtp = stp; stp->InitializePool(2); stp->EnqueueTask(taskid, [](std::function<void(int)> cbf, std::weak_ptr<ThreadPool> wtp) ->int { std::cout << "task1\n"; for (int i = 0; i < 5; ++i) { std::mutex mtx; std::unique_lock<std::mutex> lck(mtx); if(_exit_cv.wait_for(lck, std::chrono::milliseconds(400)) == std::cv_status::no_timeout) break; if (cbf) cbf(i); if (wtp.expired()) break; } return 5; }, func, _wtp);
当需要中断线程执行时,应该在外部通知线程中的任务自行退出
例子中可以在主线程中这么做
_exit_cv.notify_all();
_exit_cv用于模拟sleep操作 func用于模拟任务结果的异步通知,这里为了省事使用了函数指针,实际工作中应该使用functor来传递,以保证生命周期的有效性 比如std::bind和shared_ptr一起构造的functor对象
相关文章推荐
- 基于C++11 thread 实现线程池
- 基于c++11的100行实现简单线程池
- 转 一个基于ACE的负载自适应万能线程池实现
- C++实现的带最大最小线程数的线程池(基于ACE)
- 一个C++基于boost简单实现的线程池
- 自己封装的数据源(支持多种数据库访问,内部实现线程池 基于bonecp)
- 基于生产-消费者模式的任务异步线程池设计与实现
- C++11简化线程池的实现
- 使用C++11 实现的线程池
- 基于C++11的线程池
- 无锁编程:c++11基于atomic实现共享读写锁(写优先)
- 基于线程池实现linux下守护进程运行web目录服务器
- 基于C++11的阻塞队列简单实现
- 基于ACE的线程池学习与实现(三)—— 并发编程资料
- C++11:借助C++11特性简单高效实现线程池
- 一个基于ACE的线程池的实现(转)
- 基于ACE的线程池学习与实现(二)——ACE_Message_Block与ACE_Condition
- 基于半同步/半反应堆线程池实现的HTTP解析服务端程序
- c++11 实现半同步半异步线程池
- 【源码剖析】threadpool —— 基于 pthread 实现的简单线程池