c++11 线程安全的队列实现
2016-12-13 11:53
330 查看
用c++11的mutex和condition_variable配合STL的queue可以很方便地实现线程安全的队列,queue用来存取元素,存取元素时用mutex进行全局加锁,存取完成再进行解锁。
但是直接这样实现存在着性能问题,因为每次操作都是独占着锁(1、3),多线程的操作在这里便变成串行化操作。因为queue直接封装了实现,我们只能用push和pop操作元素,这两个操作本身不是原子操作,但是在这里却不得不通过互斥锁来将其变为原子的操作(2、4)。因为受限于queue,我们无法对这里做出任何改变,只能进行全局的加锁保护,这样便无法对性能做出任何改进。另外由于使用queue,容器元素的构造也被放入到了加锁中,而这是不必要的。
要增加操作的并发性能,我们只有自己实现一个底层队列用来存取元素,让push和pop能够部分操作同时进行,缩小加锁的范围,并且将元素的构造操作放在加锁范围外。
观察链表的实现,pop时只需要读取第一个元素即头指针,然后改变头指针的值;push时只需要更改尾指针的next并设置新的尾指针。仅当头指针和尾指针相等的时候,两个操作push_back和pop_front才会出现对内部数据修改的竞争,如果两个指针不相等,那么一个push_back操作和一个pop_front操作可以完全并行地执行,当然还是需要保证多线程的push_back操作串行化且多线程的pop_front操作串行化。也就是说如果队列的头指针和尾指针能够永远不相等,那么就可以使用两个mutex,一个对push_back操作进行加锁,一个对pop_front操作进行加锁,这样就一定保证队列的操作是线程安全的,并且增强了并发性。当然两个指针是不可能永不相等的,普通的链表实现当队列元素个数小于2的时候头尾指针相等,这样通过两个mutex加锁,push_back时需要对两个mutex都加锁,额外的一次加锁是因为当head为空时需要设置head为tail,pop_front时也需要加两个锁,当队列因为pop变为空队列时需要同时设置head和tail。得对链表做一些改进让两个加锁方案能够降低加锁范围。试想,如果head和tail永不为空,那么push_back操作便不需要关心head,只需对一个锁加锁,pop_front操作只需关心队列是否为空,不为空时只需保持对一个锁的加锁。
要保证head和tail永不为空是可以实现的:初始化时链表即有一个空节点,head和tail均指向它,head=tail即代表队列为空,否则不为空。这样push_back时先构造一个元素和空节点,再对tail加锁,设置tail节点元素为刚构造的元素,设置tail的next为新的空节点,设置tail为新空节点,解锁tail。只需对tail加锁。pop_front时,先对head加锁,再对tail加锁,检查队列是否为空(head==tail),解锁tail,根据队列是否为空取出元素,解锁head。对head和tail都加锁,但对tail加锁范围极小。具体实现代码如下:
#include <queue> #include <mutex> #include <memory> #include <condition_variable> template<typename T> class threadsafe_queue { public: threadsafe_queue() {} ~threadsafe_queue() {} void push(T new_data) { std::lock_guard<std::mutex> lk(mut); // 1.全局加锁 data_queue.push(std::move(new_data)); // 2.push时独占锁 cond.notify_one(); } void wait_and_pop(T& val) { std::unique_lock<std::mutex> ulk(mut); // 3.全局加锁 cond.wait(ulk,[this]() { return !data_queue.empty(); }); // 4.front 和 pop_front时独占锁 val=std::move(data_queue.front()); data_queue.pop(); } std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> ulk(mut); cond.wait(ulk,[this]() { return !data_queue.empty(); }); std::shared_ptr<T> val(std::make_shared<T>(std::move(data_queue.front()))); data_queue.pop(); return val; } bool try_pop(T& val) { std::lock_guard<std::mutex> lk(mut); if(data_queue.empty()) return false; val=std::move(data_queue.front()); data_queue.pop(); return true; } std::shared_ptr<T> try_pop() { std::shared_ptr<T> val; std::lock_guard<std::mutex> lk(mut); if(data_queue.empty()) return val; val=std::make_shared<T>(std::move(data_queue.front())); data_queue.pop(); return val; } bool empty() { std::lock_guard<std::mutex> lk(mut); return data_queue.empty(); } private: std::queue<T> data_queue; std::mutex mut; std::condition_variable cond; };
但是直接这样实现存在着性能问题,因为每次操作都是独占着锁(1、3),多线程的操作在这里便变成串行化操作。因为queue直接封装了实现,我们只能用push和pop操作元素,这两个操作本身不是原子操作,但是在这里却不得不通过互斥锁来将其变为原子的操作(2、4)。因为受限于queue,我们无法对这里做出任何改变,只能进行全局的加锁保护,这样便无法对性能做出任何改进。另外由于使用queue,容器元素的构造也被放入到了加锁中,而这是不必要的。
要增加操作的并发性能,我们只有自己实现一个底层队列用来存取元素,让push和pop能够部分操作同时进行,缩小加锁的范围,并且将元素的构造操作放在加锁范围外。
观察链表的实现,pop时只需要读取第一个元素即头指针,然后改变头指针的值;push时只需要更改尾指针的next并设置新的尾指针。仅当头指针和尾指针相等的时候,两个操作push_back和pop_front才会出现对内部数据修改的竞争,如果两个指针不相等,那么一个push_back操作和一个pop_front操作可以完全并行地执行,当然还是需要保证多线程的push_back操作串行化且多线程的pop_front操作串行化。也就是说如果队列的头指针和尾指针能够永远不相等,那么就可以使用两个mutex,一个对push_back操作进行加锁,一个对pop_front操作进行加锁,这样就一定保证队列的操作是线程安全的,并且增强了并发性。当然两个指针是不可能永不相等的,普通的链表实现当队列元素个数小于2的时候头尾指针相等,这样通过两个mutex加锁,push_back时需要对两个mutex都加锁,额外的一次加锁是因为当head为空时需要设置head为tail,pop_front时也需要加两个锁,当队列因为pop变为空队列时需要同时设置head和tail。得对链表做一些改进让两个加锁方案能够降低加锁范围。试想,如果head和tail永不为空,那么push_back操作便不需要关心head,只需对一个锁加锁,pop_front操作只需关心队列是否为空,不为空时只需保持对一个锁的加锁。
要保证head和tail永不为空是可以实现的:初始化时链表即有一个空节点,head和tail均指向它,head=tail即代表队列为空,否则不为空。这样push_back时先构造一个元素和空节点,再对tail加锁,设置tail节点元素为刚构造的元素,设置tail的next为新的空节点,设置tail为新空节点,解锁tail。只需对tail加锁。pop_front时,先对head加锁,再对tail加锁,检查队列是否为空(head==tail),解锁tail,根据队列是否为空取出元素,解锁head。对head和tail都加锁,但对tail加锁范围极小。具体实现代码如下:
#ifndef __THREADSAFE_QUEUE2_H_ #define __THREADSAFE_QUEUE2_H_ #include <mutex> #include <memory> #include <functional> #include <condition_variable> template <typename T> class threadsafe_queue2 { public: threadsafe_queue2() : head(new node) , tail(head.get()) {} threadsafe_queue2(const threadsafe_queue2&) = delete; threadsafe_queue2 operator=(const threadsafe_queue2&) = delete; ~threadsafe_queue2() = default; std::shared_ptr<T> try_pop() { std::unique_lock<std::mutex> ulkh(head_mut); if (head.get() == get_tail()) return nullptr; auto old_head = std::move(head); head = std::move(old_head->next); return old_head->data; } //此版本存在最后一个元素加入后,可能一直阻塞在wait中,下面一个函数不存在此问题 /*std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> ulkh(head_mut); data_cond.wait(ulkh, [&]() { node *old_tail=get_tail(); return head.get() != old_tail; }); auto old_head = std::move(head); head = std::move(old_head->next); return old_head->data; }*/ std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> ulkh(head_mut); { std::unique_lock<std::mutex> ulkt(tail_mut); data_cond.wait(ulkt, [&]() { return head.get() != tail; }); } auto old_head = std::move(head); head = std::move(old_head->next); return old_head->data; } void push(T &&t) { std::shared_ptr<T> new_data(std::make_shared<T>(std::forward<T>(t))); std::unique_ptr<node> new_tail(new node); node *p = new_tail->get(); { std::unique_lock<std::mutex> ulkt(tail_mut); tail->data = new_data; tail->next = std::move(new_tail); tail = p; } data_cond.notify_one(); } private: struct node { std::shared_ptr<T> data; std::unique_ptr<node> next; }; std::mutex head_mut; std::mutex tail_mut; std::unique_ptr<node> head; node *tail; std::condition_variable data_cond; private: node* get_tail() { std::unique_lock<std::mutex> ulkt(tail_mut); return tail; } }; template <typename T> class threadsafe_queue2<T*> { public: threadsafe_queue2() : head(new node) , tail(head) {} threadsafe_queue2(const threadsafe_queue2&) = delete; threadsafe_queue2 operator=(const threadsafe_queue2&) = delete; ~threadsafe_queue2() { node *pre; for (; head != tail;) { pre = head; head = head->next; delete pre; } delete tail; } T* try_pop() { node *old_head = nullptr; { std::unique_lock<std::mutex> ulkh(head_mut); if (head == get_tail()) return nullptr; old_head = head; head = head->next; } T *data = old_head->data; delete old_head; return data; }
//此版本存在最后一个元素加入后,可能一直阻塞在wait中,下面一个函数不存在此问题
/*T* wait_and_pop() { node *old_head = nullptr; { std::unique_lock<std::mutex> ulkh(head_mut); data_cond.wait(ulkh, [&]() { node *old_tail = get_tail(); return head != old_tail; }); old_head = head; head = head->next; } T *data = old_head->data; delete old_head; return data; }*/ T* wait_and_pop() { node *old_head = nullptr; { std::unique_lock<std::mutex> ulkh(head_mut); { std::unique_lock<std::mutex> ulkt(tail_mut); data_cond.wait(ulkt, [&]() { return head != tail; }); } old_head = head; head = head->next; } T *data = old_head->data; delete old_head; return data; } void push(T *t) { node *new_tail = new node; { std::unique_lock<std::mutex> ulkt(tail_mut); tail->data = t; tail->next = new_tail; tail = new_tail; } data_cond.notify_one(); } private: struct node { T *data; node *next; }; std::mutex head_mut; std::mutex tail_mut; node *head; node *tail; std::condition_variable data_cond; private: node* get_tail() { std::unique_lock<std::mutex> ulkt(tail_mut); return tail; } }; #endif
相关文章推荐
- C++11 使用 unique_lock,lock_guard,condition_variable, lambda表达式实现线程安全队列
- linux多线程编程(C):信号量实现的线程安全队列
- 基于C++11的阻塞队列简单实现
- c++11 完全公平队列实现
- 一个队列类的实现(比delphi自带的速度快70倍)(线程安全版本)
- C++11线程安全队列
- 线程安全队列 采用双list实现
- 利用c++11实现线程安全的单例类
- python实现线程安全队列
- C++11 线程安全的BlockingQueue实现
- Linux多线程系列-2-条件变量的使用(线程安全队列的实现)
- 利用条件变量实现线程安全队列
- 用c++11写的一个线程安全的队列
- c++11多线程编程---线程安全队列
- c++11线程安全的队列的类的定义
- linux多线程编程(C):互斥量实现的线程安全队列
- 单例模式实现 线程安全的队列 处理
- C++ 实现线程安全的任务队列
- java实现线程安全的队列
- 用C++实现单例模式3——如何在不使用锁和C++11的情况下,用C++实现线程安全的Singleton