您的位置:首页 > 其它

[多线程] 生产者消费者模型的BOOST实现

2016-02-17 16:23 549 查看

说明

如果 使用过程中有BUG 一定要告诉我:在下面留言或者给我邮件(sawpara at 126 dot com)

使用boost::thread库来实现生产者消费者模型中的缓冲区!

仓库内最多可以存放 capacity 个产品。

条件变量 condition_put 标记是否可以往仓库中存放一个产品。

条件变量 condition_get 标记是否可以从仓库中取出一个产品。

互斥量 mutexer 用于保证当前仓库只有一个线程拥有主权。

实现

#include <queue>
#include "boost/thread/condition.hpp"
#include "boost/thread/mutex.hpp"
template<class Product>
class Repository
{
public:
Repository() : _capacity(2), _emptynum(_capacity){}
Repository(int capacity) : _emptynum(capacity), _capacity(capacity){}

// success : when new_capacity > _capacity - _emptynum
bool set_capacity(int new_capacity){
boost::mutex::scoped_lock lock();
if (_capacity - _emptynum < new_capacity ){
_emptynum = new_capacity - _capacity + _emptynum;
_capacity = new_capacity;
return true;
}
return false;
}

bool empty(){
boost::mutex::scoped_lock lock(_mutexer);
return _is_empty();
}

void put(Product &product){
{
boost::mutex::scoped_lock lock(_mutexer);
while (_is_full()){
// unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
_condition_put.wait(_mutexer);
}
_products.push(product); // need implement the copy constructor and =
_emptynum--; // decrease empty position
}
_condition_get.notify_one(); // have put one, one another thread can get product now
}

void get(Product &product){
{
// lock this repository
boost::mutex::scoped_lock lock(_mutexer);
while (_is_empty()){
// unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
_condition_get.wait(_mutexer);
}
product = _products.front(); // need implement the copy constructor and =
_products.pop();
_emptynum++; // increase empty position
}
// have removed one product, one another thread can put product now
_condition_put.notify_one();
}

private:
inline bool _is_full (){ return _emptynum ==         0; } // if have empty position
inline bool _is_empty(){ return _emptynum == _capacity; } //

private:
int _capacity; // capacity of this repository
int _emptynum; // number of empty position
std::queue<Product> _products; // products in a FIFO queue

boost::mutex _mutexer; // race access
boost::condition _condition_put;
boost::condition _condition_get;
};


实验

#include <iostream>
#include "boost/thread.hpp"
boost::mutex g_io_mutexer;

boost::mutex g_mutexer;
bool g_is_finished = false;

Repository<int> g_repository(4);

void producing(){
for (int i = 0; i < 100; i++){
{
boost::mutex::scoped_lock lock(g_io_mutexer);
std::cout << "Producing product : " << i << std::endl;
}
g_repository.put(i);
}
boost::mutex::scoped_lock lock(g_mutexer);
g_is_finished = true;
}

void consuming(){
int product;
while (true){
{
boost::mutex::scoped_lock lock(g_mutexer);
if (g_is_finished && g_repository.empty()){
break;
}
}
g_repository.get(product);
{
boost::mutex::scoped_lock lock(g_io_mutexer);
std::cout << "Consuming product : " << product << std::endl;
}
}
}

int main(){
boost::thread producer(producing);
boost::thread consumer_1(consuming);
boost::thread consumer_2(consuming);
producer.join();
consumer_1.join();
consumer_2.join();
return 0;
}


all the code

为方便理解代码,下面的代码中,三个空行一个意群。

#include <iostream>
#include <queue>

#include "boost/thread.hpp"

#include "boost/thread/condition.hpp"
#include "boost/thread/mutex.hpp"

template<class Product>
class Repository
{
public:
Repository() : _capacity(2), _emptynum(_capacity){}
Repository(int capacity) : _emptynum(capacity), _capacity(capacity){}

// success : when new_capacity > _capacity - _emptynum
bool set_capacity(int new_capacity){
boost::mutex::scoped_lock lock();
if (_capacity - _emptynum < new_capacity ){
_emptynum = new_capacity - _capacity + _emptynum;
_capacity = new_capacity;
return true;
}
return false;
}

bool empty(){
boost::mutex::scoped_lock lock(_mutexer);
return _is_empty();
}

void put(Product &product){
{
boost::mutex::scoped_lock lock(_mutexer);
while (_is_full()){
// unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
_condition_put.wait(_mutexer);
}
_products.push(product); // need implement the copy constructor and =
_emptynum--; // decrease empty position
}
_condition_get.notify_one(); // have put one, one another thread can get product now
}

void get(Product &product){
{
// lock this repository
boost::mutex::scoped_lock lock(_mutexer);
while (_is_empty()){
// unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
_condition_get.wait(_mutexer);
}
product = _products.front(); // need implement the copy constructor and =
_products.pop();
_emptynum++; // increase empty position
}
// have removed one product, one another thread can put product now
_condition_put.notify_one();
}

private:
inline bool _is_full (){ return _emptynum ==         0; } // if have empty position
inline bool _is_empty(){ return _emptynum == _capacity; } //

private:
int _capacity; // capacity of this repository
int _emptynum; // number of empty position
std::queue<Product> _products; // products in a FIFO queue

boost::mutex _mutexer; // race access
boost::condition _condition_put;
boost::condition _condition_get;
};

boost::mutex g_io_mutexer;

boost::mutex g_mutexer;
bool g_is_finished = false;

Repository<int> g_repository(4);

void producing(){
for (int i = 0; i < 100; i++){
{
boost::mutex::scoped_lock lock(g_io_mutexer);
std::cout << "Producing product : " << i << std::endl;
}
g_repository.put(i);
}
boost::mutex::scoped_lock lock(g_mutexer);
g_is_finished = true;
}

void consuming(){
int product;
while (true){
{
boost::mutex::scoped_lock lock(g_mutexer);
if (g_is_finished && g_repository.empty()){
break;
}
}
g_repository.get(product);
{
boost::mutex::scoped_lock lock(g_io_mutexer);
std::cout << "Consuming product : " << product << std::endl;
}
}
}

int main(){
boost::thread producer(producing);
boost::thread consumer_1(consuming);
boost::thread consumer_2(consuming);
producer.join();
consumer_1.join();
consumer_2.join();
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: