您的位置:首页 > 其它

利用条件变量实现线程安全队列

2014-02-15 11:00 330 查看
作者是Anthony Williams,boost库现在的维护者。

原文:


Implementing
a Thread-Safe Queue using Condition Variables

利用条件变量实现线程安全队列

One problem that comes up time and again with multi-threaded code is how to transfer data from one thread to another. For example, one common way to parallelize a serial algorithm is to split it into
independent chunks and make a pipeline — each stage in the pipeline can be run on a separate thread, and each stage adds the data to the input queue for the next stage when it's done. For this to work properly, the input queue needs to be written so that data
can safely be added by one thread and removed by another thread without corrupting the data structure.
多线程编程总会出现的一个问题是怎么把数据从一个线程传给另一个线程。比如,并行化一个串行算法通常的做法是把他分割成独立的部分,做成一个流水线(pipeline)---每个流水线的步骤(stage in pipeline)被运行在不同的线程上,每个步骤完成后把数据传给下一个步骤的输入队列(input queue)。为了实现这样的并行化,输入队列(input queue)需要精心设计从而一个线程可以安全的增加数据,另一个线程可以安全的移除数据。

Basic Thread Safety with a Mutex

用互斥锁(mutex)实现基本的线程安全


The simplest way of doing this is just to put wrap a non-thread-safe queue, and protect it with a mutex (the examples use the types and functions from the upcoming 1.35 release of Boost):

最简单的办法是通过使用互斥锁(mutex)来包装一个非线程安全的队列(non-thread-safe queue)。(示例用了boost 1.35 release版本的类型和函数):

template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
public:
void push(const Data& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
}

bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}

Data& front()
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.front();
}

Data const& front() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.front();
}

void pop()
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.pop();
}
};
This design is subject to race conditions between calls to empty, front and pop if there is more than one thread removing items from the queue, but in a single-consumer system
(as being discussed here), this is not a problem. There is, however, a downside to such a simple implementation: if your pipeline stages are running on separate threads, they likely have nothing to do if the queue is empty, so they end up with a wait loop:

如果多于一个线程从队列中删除数据项,这种设计在调用empty,front和pop的时候容易出现竞争条件(race condition)。但在单一消费者的系统里,这并不是问题。然而,这种简单实现有个缺点:如果你的流水线步骤运行在不同的线程中,这些线程在队列是空的时候就什么都不干了,所以最终设计成一个等待循环:

while(some_queue.empty())
{
boost::this_thread::sleep(boost::posix_time::milliseconds(50));
}
Though the sleep avoids the high CPU consumption of a direct busy wait, there are still some obvious downsides to this formulation. Firstly, the thread has to wake every 50ms or so (or
whatever the sleep period is) in order to lock the mutex, check the queue, and unlock the mutex, forcing a context switch. Secondly, the sleep period imposes a limit on how fast the thread can respond to data being added to the queue — if the data is added
just before the call to sleep, the thread will wait at least 50ms before checking for data. On average, the thread will only respond to data after about half the sleep time (25ms here).

尽管sleep避免了直接忙等(busy wait)带来的高cpu占用,但这种方法依然有一些明显的缺点。首先,线程每50ms唤醒一次,接着得到互斥锁,检查队列,放弃互斥锁,强行进行一次上下文切换(context switch)。第二,sleep延长了数据添加到队列后的反应时间,如果在调用sleep之前刚好数据被添加到队列中,那么线程需要等至少50ms才能检查到添加的数据。平均来说,线程对添加数据的反应时间大概是sleep时间的一半(在这里是25ms)。

Waiting with a Condition Variable

用竞争变量(condition variable)等待


As an alternative to continuously polling the state of the queue, the sleep in the wait loop can be replaced with a condition variable wait. If the condition variable is notified in push when data is
added to an empty queue, then the waiting thread will wake. This requires access to the mutex used to protect the queue, so needs to be implemented as a member function of concurrent_queue:

持续轮训队列状态的另一种替代方法是用竞争变量等待来替换等待循环中的sleep。当数据添加到空队列,竞争变量在push函数中被通知,等待线程就会唤醒。这需要得到保护队列的互斥锁,所以需要实现为concurrent_queue的成员函数:

template<typename Data>
class concurrent_queue
{
private:
boost::condition_variable the_condition_variable;
public:
void wait_for_data()
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
the_condition_variable.wait(lock);
}
}
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
bool const was_empty=the_queue.empty();
the_queue.push(data);
if(was_empty)
{
the_condition_variable.notify_one();
}
}
// rest as before
};
There are three important things to note here. Firstly, the lock variable is passed as a parameter to wait — this allows the condition variable implementation to atomically unlock the
mutex and add the thread to the wait queue, so that another thread can update the protected data whilst the first thread waits.

这里有3个重要事情。首先,lock变量作为参数传递给wait---这允许竞争变量原子地放弃互斥锁,并把线程添加到等待队列,另一个线程可以在那个线程等待的同时更新保护数据。

Secondly, the condition variable wait is still inside a while loop — condition variables can be subject to spurious wake-ups, so it is important to check the actual condition being waited for when the call to wait returns.

第二,竞争变量等待依然在一个while循环中,竞争变量可以被伪唤醒(spurious wake-up 唤醒不是因为事件的发生,而是由信号唤醒的),所以很重要的一点事wait调用返回后,可以去检查真正等待的条件。

Be careful when you notify

notify的时候要小心

Thirdly, the call to notify_one comes after the data is pushed on the internal queue. This avoids the waiting thread being notified if the call to the_queue.push throws an exception. As written, the call to notify_one
is still within the protected region, which is potentially sub-optimal: the waiting thread might wake up immediately it is notified, and before the mutex is unlocked, in which case it will have to block when the mutex is reacquired on the exit from wait. By
rewriting the function so that the notification comes after the mutex is unlocked, the waiting thread will be able to acquire the mutex without blocking:

第三,notify_one的调用发生在数据被push到队列中以后,所以如果队列的push抛出异常,等待线程也就不会被通知(notified)。正如代码所示,notify_one的调用仍然在保护区域,这并不是最优的,因为等待线程被通知后在放弃互斥锁之前就被唤醒了,这样的话在wait退出,互斥锁被重新得到时就被阻塞(block)了。通过重写函数使得通知在放弃互斥锁之后,等待线程在得到互斥锁前就不会被阻塞了。

template<typename Data>
class concurrent_queue
{
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
bool const was_empty=the_queue.empty();
the_queue.push(data);

lock.unlock(); // unlock the mutex

if(was_empty)
{
the_condition_variable.notify_one();
}
}
// rest as before
};


Reducing the locking overhead

减小加锁开销

Though the use of a condition variable has improved the pushing and waiting side of the interface, the interface for the consumer thread still has to perform excessive locking: wait_for_data,
front and pop all lock the mutex, yet they will be called in quick succession by the consumer thread.

尽管使用竞争变量加速了push和wait,但是消费线程依然要进行过多的加锁:wait_for_data, front 和 pop都要得到互斥锁,并且,这些都被消费线程连续调用。

By changing the consumer interface to a single wait_and_pop function, the extra lock/unlock calls can be avoided:

通过改变消费者接口为一个wait_and_pop函数,其他的lock/unlock调用可以被避免:

template<typename Data>
class concurrent_queue
{
public:
void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
the_condition_variable.wait(lock);
}

popped_value=the_queue.front();
the_queue.pop();
}

// rest as before
};
Using a reference parameter to receive the result is used to transfer ownership out of the queue in order to avoid the exception safety issues of returning data by-value: if the copy
constructor of a by-value return throws, then the data has been removed from the queue, but is lost, whereas with this approach, the potentially problematic copy is performed prior to modifying the queue (see Herb Sutter's Guru Of The Week #8 for a discussion
of the issues). This does, of course, require that an instance Data can be created by the calling code in order to receive the result, which is not always the case. In those cases, it might be worth using something like boost::optional to avoid this requirement.

通过引用传参返回queue的pop出的数据,避免了值传参(by-value)带来的异常的安全问题(exception safety issue):如果值传参返回时的拷贝构造函数抛出异常,在数据从queue中移除同时,异常也无法被catch。然而用这种方法,潜在的有问题的拷贝先修改了queue。这样的传值方法在返回的时候需要创建一个数据实例(instance data),但这并不总是成功。可以通过使用boost::optional来处理这个问题。

Handling multiple consumers

应对多消费者


As well as removing the locking overhead, the combined wait_and_pop function has another benefit — it automatically allows for multiple consumers. Whereas the fine-grained nature of the separate functions
makes them subject to race conditions without external locking (one reason why the authors of the SGI STL advocate against making things like std::vector thread-safe — you need external locking to do many common operations, which makes the internal locking
just a waste of resources), the combined function safely handles concurrent calls.

在消除锁开销的同时,wait_and_pop函数还有其他好处:它允许多个消费者。然而这些函数的细粒度特性使得他们很容易在没有外部锁的情况下产生竞争条件(这也是SGI STL的作者宣扬反对将std::vector变得线程安全---你需要外部的锁(external locking)做很多操作,这让内部锁(internal locking)只是在浪费资源),这个函数安全的解决了并发访问的问题。

If multiple threads are popping entries from a full queue, then they just get serialized inside wait_and_pop, and everything works fine. If the queue is empty, then each thread in turn will block waiting
on the condition variable. When a new entry is added to the queue, one of the threads will wake and take the value, whilst the others keep blocking. If more than one thread wakes (e.g. with a spurious wake-up), or a new thread calls wait_and_pop concurrently,
the while loop ensures that only one thread will do the pop, and the others will wait.

如果多个线程从满队列里pop数据,在wait_and_pop中他们是串行的,满足了需求。如果队列是空的,每个线程将会阻塞等待条件变量。当一个数据进入队列时,一个线程将会得到数据,同事其他线程保持阻塞状态。如果多个线程被唤醒(被伪唤醒),或者一个新的线程并发的调用wait_and_pop,while循环保证只有一个线程pop,其他等待。

Update: As commenter David notes below, using multiple consumers does have one problem: if there are several threads waiting when data is added, only one is woken. Though this is exactly what you
want if only one item is pushed onto the queue, if multiple items are pushed then it would be desirable if more than one thread could wake. There are two solutions to this: use notify_all() instead of notify_one() when waking threads, or to call notify_one()
whenever any data is added to the queue, even if the queue is not currently empty. If all threads are notified then the extra threads will see it as a spurious wake and resume waiting if there isn't enough data for them. If we notify with every push() then
only the right number of threads are woken. This is my preferred option: condition variable notify calls are pretty light-weight when there are no threads waiting. The revised code looks like this:

更新:David评论说,使用多个消费者存在一个问题:当数据进入队列的时候如果有多个线程等待,只有一个被唤醒。尽管我们的确想当一个数据进入队列时这样操作,当多个数据进入队列时,如果多个线程被唤醒就好了。有两个办法解决这个问题:唤醒线程时使用notify_all()代替notify_one(),或不管什么时候数据加入队列,甚至是队列并不空的时候,都调用notify_one()。如果所有线程被通知,除了得到数据的其他线程都会当做一个伪唤醒(spurious
wake),在没有足够数据的时候,会重新等待。所以如果我们在每个push()时通知(notify),那么只有对应数量的线程被唤醒。这是我的选择:如果没有线程等待,那么条件变量通知是一个轻量级的调用。修正的代码如下:

template<typename Data>
class concurrent_queue
{
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
lock.unlock();
the_condition_variable.notify_one();
}
// rest as before
};
There is one benefit that the separate functions give over the combined one — the ability to check for an empty queue, and do something else if the queue is empty. empty itself still
works in the presence of multiple consumers, but the value that it returns is transitory — there is no guarantee that it will still apply by the time a thread calls wait_and_pop, whether it was true or false. For this reason it is worth adding an additional
function: try_pop, which returns true if there was a value to retrieve (in which case it retrieves it), or false to indicate that the queue was empty.

有一个点是wait_and_pop这个综合函数不如之前分开的函数的:检查是否空队列,然后做一些其他事情。空队列在多消费者这里依然管用,但empty函数返回的结果确实暂时的----不管是true还是false,并不能保证在调用wait_and_pop的时候这个结果依然管用。鉴于此,有必要增加一个函数:try_pop,当有数据的时候(在这种情况下直接返回这个数据),它返回true,否则返回false表示队列是空的。

template<typename Data>

class concurrent_queue

{

public:

bool try_pop(Data& popped_value)

{

boost::mutex::scoped_lock lock(the_mutex);

if(the_queue.empty())

{

return false;

}

popped_value=the_queue.front();

the_queue.pop();

return true;

}

// rest as before

};

By removing the separate front and pop functions, our simple naive implementation has now become a usable multiple producer, multiple consumer concurrent queue.

通过删除单独的front和pop函数,我们的简单的实现成为了一个可用的多生产者,多消费者的并发队列。

The Final Code

最终代码


Here is the final code for a simple thread-safe multiple producer, multiple consumer queue:

下面是一个线程安全的多生产者多消费者队列:

template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
lock.unlock();
the_condition_variable.notify_one();
}

bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}

bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if(the_queue.empty())
{
return false;
}

popped_value=the_queue.front();
the_queue.pop();
return true;
}

void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
the_condition_variable.wait(lock);
}

popped_value=the_queue.front();
the_queue.pop();
}

};
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: