您的位置:首页 > 编程语言 > C语言/C++

c++11 线程安全的队列实现

2016-12-13 11:53 330 查看
    用c++11的mutex和condition_variable配合STL的queue可以很方便地实现线程安全的队列,queue用来存取元素,存取元素时用mutex进行全局加锁,存取完成再进行解锁。

#include <queue>
#include <mutex>
#include <memory>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue() {}
~threadsafe_queue() {}

void push(T new_data)
{
std::lock_guard<std::mutex> lk(mut);            // 1.全局加锁
data_queue.push(std::move(new_data));           // 2.push时独占锁
cond.notify_one();
}
void wait_and_pop(T& val)
{
std::unique_lock<std::mutex> ulk(mut);                    // 3.全局加锁
cond.wait(ulk,[this]() { return !data_queue.empty(); });  // 4.front 和 pop_front时独占锁
val=std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> ulk(mut);
cond.wait(ulk,[this]() { return !data_queue.empty(); });
std::shared_ptr<T> val(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return val;
}
bool try_pop(T& val)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
val=std::move(data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::shared_ptr<T> val;
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return val;
val=std::make_shared<T>(std::move(data_queue.front()));
data_queue.pop();
return val;
}
bool empty()
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
private:
std::queue<T> data_queue;
std::mutex mut;
std::condition_variable cond;
};


    但是直接这样实现存在着性能问题,因为每次操作都是独占着锁(1、3),多线程的操作在这里便变成串行化操作。因为queue直接封装了实现,我们只能用push和pop操作元素,这两个操作本身不是原子操作,但是在这里却不得不通过互斥锁来将其变为原子的操作(2、4)。因为受限于queue,我们无法对这里做出任何改变,只能进行全局的加锁保护,这样便无法对性能做出任何改进。另外由于使用queue,容器元素的构造也被放入到了加锁中,而这是不必要的。

    要增加操作的并发性能,我们只有自己实现一个底层队列用来存取元素,让push和pop能够部分操作同时进行,缩小加锁的范围,并且将元素的构造操作放在加锁范围外。

    观察链表的实现,pop时只需要读取第一个元素即头指针,然后改变头指针的值;push时只需要更改尾指针的next并设置新的尾指针。仅当头指针和尾指针相等的时候,两个操作push_back和pop_front才会出现对内部数据修改的竞争,如果两个指针不相等,那么一个push_back操作和一个pop_front操作可以完全并行地执行,当然还是需要保证多线程的push_back操作串行化且多线程的pop_front操作串行化。也就是说如果队列的头指针和尾指针能够永远不相等,那么就可以使用两个mutex,一个对push_back操作进行加锁,一个对pop_front操作进行加锁,这样就一定保证队列的操作是线程安全的,并且增强了并发性。当然两个指针是不可能永不相等的,普通的链表实现当队列元素个数小于2的时候头尾指针相等,这样通过两个mutex加锁,push_back时需要对两个mutex都加锁,额外的一次加锁是因为当head为空时需要设置head为tail,pop_front时也需要加两个锁,当队列因为pop变为空队列时需要同时设置head和tail。得对链表做一些改进让两个加锁方案能够降低加锁范围。试想,如果head和tail永不为空,那么push_back操作便不需要关心head,只需对一个锁加锁,pop_front操作只需关心队列是否为空,不为空时只需保持对一个锁的加锁。

    要保证head和tail永不为空是可以实现的:初始化时链表即有一个空节点,head和tail均指向它,head=tail即代表队列为空,否则不为空。这样push_back时先构造一个元素和空节点,再对tail加锁,设置tail节点元素为刚构造的元素,设置tail的next为新的空节点,设置tail为新空节点,解锁tail。只需对tail加锁。pop_front时,先对head加锁,再对tail加锁,检查队列是否为空(head==tail),解锁tail,根据队列是否为空取出元素,解锁head。对head和tail都加锁,但对tail加锁范围极小。具体实现代码如下:

#ifndef __THREADSAFE_QUEUE2_H_
#define __THREADSAFE_QUEUE2_H_

#include <mutex>
#include <memory>
#include <functional>
#include <condition_variable>

template <typename T>
class threadsafe_queue2
{

public:

threadsafe_queue2()
: head(new node)
, tail(head.get())
{}
threadsafe_queue2(const threadsafe_queue2&) = delete;
threadsafe_queue2 operator=(const threadsafe_queue2&) = delete;
~threadsafe_queue2() = default;

std::shared_ptr<T> try_pop()
{
std::unique_lock<std::mutex> ulkh(head_mut);
if (head.get() == get_tail())
return nullptr;
auto old_head = std::move(head);
head = std::move(old_head->next);
return old_head->data;
}

//此版本存在最后一个元素加入后,可能一直阻塞在wait中,下面一个函数不存在此问题
/*std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> ulkh(head_mut);
data_cond.wait(ulkh, [&]()
{
node *old_tail=get_tail();
return head.get() != old_tail;
});
auto old_head = std::move(head);
head = std::move(old_head->next);
return old_head->data;
}*/

std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> ulkh(head_mut);
{
std::unique_lock<std::mutex> ulkt(tail_mut);
data_cond.wait(ulkt, [&]()
{
return head.get() != tail;
});
}
auto old_head = std::move(head);
head = std::move(old_head->next);
return old_head->data;
}

void push(T &&t)
{
std::shared_ptr<T> new_data(std::make_shared<T>(std::forward<T>(t)));
std::unique_ptr<node> new_tail(new node);
node *p = new_tail->get();
{
std::unique_lock<std::mutex> ulkt(tail_mut);
tail->data = new_data;
tail->next = std::move(new_tail);
tail = p;
}
data_cond.notify_one();
}

private:

struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};

std::mutex head_mut;
std::mutex tail_mut;
std::unique_ptr<node> head;
node *tail;
std::condition_variable data_cond;

private:

node* get_tail()
{
std::unique_lock<std::mutex> ulkt(tail_mut);
return tail;
}

};

template <typename T>
class threadsafe_queue2<T*>
{

public:

threadsafe_queue2()
: head(new node)
, tail(head)
{}
threadsafe_queue2(const threadsafe_queue2&) = delete;
threadsafe_queue2 operator=(const threadsafe_queue2&) = delete;

~threadsafe_queue2()
{
node *pre;
for (; head != tail;)
{
pre = head;
head = head->next;
delete pre;
}
delete tail;
}

T* try_pop()
{
node *old_head = nullptr;
{
std::unique_lock<std::mutex> ulkh(head_mut);
if (head == get_tail())
return nullptr;
old_head = head;
head = head->next;
}
T *data = old_head->data;
delete old_head;
return data;
}
//此版本存在最后一个元素加入后,可能一直阻塞在wait中,下面一个函数不存在此问题
/*T* wait_and_pop()
{
node *old_head = nullptr;
{
std::unique_lock<std::mutex> ulkh(head_mut);
data_cond.wait(ulkh, [&]()
{
node *old_tail = get_tail();
return head != old_tail;
});
old_head = head;
head = head->next;
}
T *data = old_head->data;
delete old_head;
return data;
}*/

T* wait_and_pop()
{
node *old_head = nullptr;
{
std::unique_lock<std::mutex> ulkh(head_mut);
{
std::unique_lock<std::mutex> ulkt(tail_mut);
data_cond.wait(ulkt, [&]()
{
return head != tail;
});
}
old_head = head;
head = head->next;
}
T *data = old_head->data;
delete old_head;
return data;
}

void push(T *t)
{
node *new_tail = new node;
{
std::unique_lock<std::mutex> ulkt(tail_mut);
tail->data = t;
tail->next = new_tail;
tail = new_tail;
}
data_cond.notify_one();
}

private:

struct node
{
T *data;
node *next;
};

std::mutex head_mut;
std::mutex tail_mut;
node *head;
node *tail;
std::condition_variable data_cond;

private:

node* get_tail()
{
std::unique_lock<std::mutex> ulkt(tail_mut);
return tail;
}

};

#endif





                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  c11 队列 threadsafe