您的位置:首页 > 编程语言 > C语言/C++

[C++11 并发编程] 12 使用条件变量创建线程间安全的队列

2015-08-26 21:28 543 查看
之前有一节中,我们使用mutex实现了一个线程间安全的堆栈。这一节,我们使用条件变量来实现一个线程间安全的队列。

标准库中的std::queue<>的接口定义如下:

template <class T, class Container = std::deque<T> >
class queue {
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
queue(queue&& q);

template <class Alloc> explicit queue(const Alloc&);
template <class Alloc> queue(const Container&, const Alloc&);
template <class Alloc> queue(Container&&, const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);

queue& operator=(queue&& q);
void swap(queue&& q);

bool empty() const;
size_type size() const;

T& front();
const T& front() const;
T& back();
const T& back() const;

void push(const T& x);
void push(T&& x);
void pop();
};
忽略构造,赋值和交换运算,还剩下以下三类操作:

查询队列状态:empty()和size()
查询队列中的元素:front()和back()
修改队列中的元素:push()、pop()和emplace()

和stack类似,这些接口对存在竞争条件。我们需要合并front()和pop()操作。这里我们需要实现pop操作的两个变种:

try_pop():尝试从队列中pop数据并立即返回。
wait_and_pop():等待挂起直到队列中有数据被获取。

#include <memory>	// 为了使用std::shared_ptr
template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
// 为了简化实现,禁用赋值运算操作
threadsafe_queue& operator=(const threadsafe_queue&) = delete;

void push(T new_value);

// 返回被获取数据的引用,返回值作为pop成功和失败的标志
bool try_pop(T& value);
// 返回值为NULL则表示获取失败,否则返回指向被pop数据的指针
std::shared_ptr<T> try_pop();

void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();

bool empty() const;
};

修改前一节的程序,使用这个队列:

#include <iostream>

#include <mutex>
#include <condition_variable>
#include <thread>
#include <queue>

template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};

static bool more = true;

bool more_data_to_prepare()
{
return more;
}

struct data_chunk
{
char m_data = 'q';
data_chunk(char c) : m_data(c) {
}

data_chunk() : m_data('q') {
}
};

data_chunk prepare_data()
{
std::cout << "data_preparation_thread prepare_data"<< std::endl;
char x = 'q';
std::cin >> x;
if (x == 'q')
{
more = false;
}
return data_chunk(x);
}

void process(data_chunk& data)
{
std::cout << "process data: " << data.m_data << std::endl;
}

bool is_last_chunk(data_chunk& data)
{
if (data.m_data == 'q')
{
return true;
}

return false;
}

threadsafe_queue<data_chunk> data_queue;	// 用于线程间通信的队列
// mutex和条件变量都已放入到threadsafe_queue中,去掉相应的全局变量

void data_preparation_thread()
{
while(more_data_to_prepare())
{
std::cout << "data_preparation_thread while" << std::endl;
data_chunk const data=prepare_data();
// 数据准备好后,将数据插入队列之中,不在需要额外的同步操作
data_queue.push(data);
}
}

void data_processing_thread()
{
while(true)
{
std::cout << "data_processing_thread while" << std::endl;
data_chunk data;
// wait_and_pop实现了相应的“等待”操作
data_queue.wait_and_pop(data);
std::cout << "data_processing_thread process data" << std::endl;
process(data);
if(is_last_chunk(data))
break;
}
}

int main()
{
std::cout << "main" << std::endl;
std::thread t1(data_preparation_thread);
std::thread t2(data_processing_thread);

t1.join();
t2.join();
}
程序执行效果如下:

main
data_preparation_thread while
data_processing_thread while
data_preparation_thread prepare_data
a
data_preparation_thread while
data_preparation_thread prepare_data
data_processing_thread process data
process data: a
data_processing_thread while
q
data_processing_thread process data
process data: q

--------------------------------
Process exited after 2.937 seconds with return value 0
请按任意键继续. . .
线程间安全队列的完整实现如下:

#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>

template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut; // 为了是mut可以在const函数中被修改,声明为mutable
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}

void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty)
return false;
value=data_queue.front();
data_queue.pop();
}

std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

条件变量在多个线程等待同一个事件时,也是很有用的。当线程用来分解工作负载,并且只有一个线程可以对通知做出反应,与上述实例中使用的结构完全相同;运行多个数据实例——处理线程(processing thread)。当新的数据准备完成,调用notify_one()将会触发一个正在执行wait()的线程,去检查条件和wait()函数的返回状态(因为你仅是向data_queue添加了一个数据项)。 这里不保证哪一个线程会被通知到,即使只有一个等待线程被通知时,所有处线程也有可能都在处理数据。

另一种可能是,很多线程等待同一事件,对于通知他们都需要做出回应。这会发生在共享数据正在初始化的时候,当处理线程可以使用同一数据时,就要等待数据被初始化(有不错的机制可用来应对;或等待共享数据的更新,比如,定期重新初始化(periodic reinitialization)。在这些情况下,准备线程准备数据数据时,就会通过条件变量调用notify_all()成员函数,而非直接调用notify_one()函数。顾名思义,这就是全部线程在都去执行wait()(检查他们等待的条件是否满足)的原因。

当等待线程只等待一次,当条件为true时,它就不会再等待条件变量了,这种情况下,使用一个条件变量并非同步机制的最好选择。尤其是,要等待的是一小块数据被准备好块时。如果是这样,期望(future)可能是一个更适合的方法。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: