[多线程] 生产者消费者模型的BOOST实现
2016-02-17 16:23
549 查看
说明
如果 使用过程中有BUG 一定要告诉我:在下面留言或者给我邮件(sawpara at 126 dot com)使用boost::thread库来实现生产者消费者模型中的缓冲区!
仓库内最多可以存放 capacity 个产品。
条件变量 condition_put 标记是否可以往仓库中存放一个产品。
条件变量 condition_get 标记是否可以从仓库中取出一个产品。
互斥量 mutexer 用于保证当前仓库只有一个线程拥有主权。
实现
#include <queue> #include "boost/thread/condition.hpp" #include "boost/thread/mutex.hpp" template<class Product> class Repository { public: Repository() : _capacity(2), _emptynum(_capacity){} Repository(int capacity) : _emptynum(capacity), _capacity(capacity){} // success : when new_capacity > _capacity - _emptynum bool set_capacity(int new_capacity){ boost::mutex::scoped_lock lock(); if (_capacity - _emptynum < new_capacity ){ _emptynum = new_capacity - _capacity + _emptynum; _capacity = new_capacity; return true; } return false; } bool empty(){ boost::mutex::scoped_lock lock(_mutexer); return _is_empty(); } void put(Product &product){ { boost::mutex::scoped_lock lock(_mutexer); while (_is_full()){ // unlock _mutexer and blocked until _condition_put.notifiy_one/all is called. _condition_put.wait(_mutexer); } _products.push(product); // need implement the copy constructor and = _emptynum--; // decrease empty position } _condition_get.notify_one(); // have put one, one another thread can get product now } void get(Product &product){ { // lock this repository boost::mutex::scoped_lock lock(_mutexer); while (_is_empty()){ // unlock _mutexer and blocked until _condition_put.notifiy_one/all is called. _condition_get.wait(_mutexer); } product = _products.front(); // need implement the copy constructor and = _products.pop(); _emptynum++; // increase empty position } // have removed one product, one another thread can put product now _condition_put.notify_one(); } private: inline bool _is_full (){ return _emptynum == 0; } // if have empty position inline bool _is_empty(){ return _emptynum == _capacity; } // private: int _capacity; // capacity of this repository int _emptynum; // number of empty position std::queue<Product> _products; // products in a FIFO queue boost::mutex _mutexer; // race access boost::condition _condition_put; boost::condition _condition_get; };
实验
#include <iostream> #include "boost/thread.hpp" boost::mutex g_io_mutexer; boost::mutex g_mutexer; bool g_is_finished = false; Repository<int> g_repository(4); void producing(){ for (int i = 0; i < 100; i++){ { boost::mutex::scoped_lock lock(g_io_mutexer); std::cout << "Producing product : " << i << std::endl; } g_repository.put(i); } boost::mutex::scoped_lock lock(g_mutexer); g_is_finished = true; } void consuming(){ int product; while (true){ { boost::mutex::scoped_lock lock(g_mutexer); if (g_is_finished && g_repository.empty()){ break; } } g_repository.get(product); { boost::mutex::scoped_lock lock(g_io_mutexer); std::cout << "Consuming product : " << product << std::endl; } } } int main(){ boost::thread producer(producing); boost::thread consumer_1(consuming); boost::thread consumer_2(consuming); producer.join(); consumer_1.join(); consumer_2.join(); return 0; }
all the code
为方便理解代码,下面的代码中,三个空行一个意群。#include <iostream> #include <queue> #include "boost/thread.hpp" #include "boost/thread/condition.hpp" #include "boost/thread/mutex.hpp" template<class Product> class Repository { public: Repository() : _capacity(2), _emptynum(_capacity){} Repository(int capacity) : _emptynum(capacity), _capacity(capacity){} // success : when new_capacity > _capacity - _emptynum bool set_capacity(int new_capacity){ boost::mutex::scoped_lock lock(); if (_capacity - _emptynum < new_capacity ){ _emptynum = new_capacity - _capacity + _emptynum; _capacity = new_capacity; return true; } return false; } bool empty(){ boost::mutex::scoped_lock lock(_mutexer); return _is_empty(); } void put(Product &product){ { boost::mutex::scoped_lock lock(_mutexer); while (_is_full()){ // unlock _mutexer and blocked until _condition_put.notifiy_one/all is called. _condition_put.wait(_mutexer); } _products.push(product); // need implement the copy constructor and = _emptynum--; // decrease empty position } _condition_get.notify_one(); // have put one, one another thread can get product now } void get(Product &product){ { // lock this repository boost::mutex::scoped_lock lock(_mutexer); while (_is_empty()){ // unlock _mutexer and blocked until _condition_put.notifiy_one/all is called. _condition_get.wait(_mutexer); } product = _products.front(); // need implement the copy constructor and = _products.pop(); _emptynum++; // increase empty position } // have removed one product, one another thread can put product now _condition_put.notify_one(); } private: inline bool _is_full (){ return _emptynum == 0; } // if have empty position inline bool _is_empty(){ return _emptynum == _capacity; } // private: int _capacity; // capacity of this repository int _emptynum; // number of empty position std::queue<Product> _products; // products in a FIFO queue boost::mutex _mutexer; // race access boost::condition _condition_put; boost::condition _condition_get; }; boost::mutex g_io_mutexer; boost::mutex g_mutexer; bool g_is_finished = false; Repository<int> g_repository(4); void producing(){ for (int i = 0; i < 100; i++){ { boost::mutex::scoped_lock lock(g_io_mutexer); std::cout << "Producing product : " << i << std::endl; } g_repository.put(i); } boost::mutex::scoped_lock lock(g_mutexer); g_is_finished = true; } void consuming(){ int product; while (true){ { boost::mutex::scoped_lock lock(g_mutexer); if (g_is_finished && g_repository.empty()){ break; } } g_repository.get(product); { boost::mutex::scoped_lock lock(g_io_mutexer); std::cout << "Consuming product : " << product << std::endl; } } } int main(){ boost::thread producer(producing); boost::thread consumer_1(consuming); boost::thread consumer_2(consuming); producer.join(); consumer_1.join(); consumer_2.join(); return 0; }
相关文章推荐
- Reactjs Mixins
- 如何打log 检查 Android CTS failure
- [多线程] 生产者消费者模型的BOOST实现
- SpringMVC项目,启动项目怎么总是报找不到log4j.properties文件
- ruby 2.1安装mysql2 gem 报错
- android 动画
- Java重定向标准输入/输出
- 有用的shell命令集锦
- 利用ThreadLocal模式管理Session
- 查看Linux input子系统信息
- 使用springmvc编写restful接口
- [转载] How Many Partitions Does An RDD Have
- MongoDB 学习笔记(五)索引
- JavaScript中匀速运动和变速运动的代码总结
- C# xml可序列化多值枚举脚本
- KVM虚拟化笔记(十七)------V2V迁移(二)ESXI 到KVM迁移
- HTTPServerMock从手工到平台的演变
- 读取指定目录下的所有文件生成一个excel
- iOS Google Analytics(谷歌分析)开发
- MySQL删除重复数据