muduo::BlockingQueue、BoundedBlockingQueue分析
2015-07-29 15:56
369 查看
BlockingQueue
BoundedBlockingQueue
在学习源码之前,先了解一个概念:有界缓冲和无界缓冲。
以生产者、消费者模型。
有界缓冲是指生产者在向仓库添加数据时要先判断仓库是否已满,如果已满则通知消费者来取走数据。消费者在消费时,先判断仓库是否已空,如果是则先通知生产者生产数据。
BlockingQueue就是这样的一个模型
可以写个生产者消费者测试代码,有了这个BlockingQueue,不用自己在管理互斥量和信号量了。
当生产者生产时,会先判断仓库是否已满,如果是则等待仓库非满的信号;否则则向仓库添加货物,之后通知消费者仓库非空。
当消费者取货物时会先判断仓库是否为空,如果是则等待仓库非空信号;否则取走货物,通知生产者仓库非满。
这时候实现生产者消费者模型时需要2个信号量,一个是非空,表示消费者可以消费了;一个是非满,表示生产者可以生产了。源码如下:
测试代码:
BoundedBlockingQueue
在学习源码之前,先了解一个概念:有界缓冲和无界缓冲。
以生产者、消费者模型。
有界缓冲是指生产者在向仓库添加数据时要先判断仓库是否已满,如果已满则通知消费者来取走数据。消费者在消费时,先判断仓库是否已空,如果是则先通知生产者生产数据。
BlockingQueue
在无界缓冲中,生产者不用关心仓库是否已满,只需添加数据;消费者在判断仓库已空时要等待生产者的信号。这时只需要用一个信号量。BlockingQueue就是这样的一个模型
template<typename T> class BlockingQueue : boost::noncopyable { public: BlockingQueue() : mutex_(),//先初始化互斥量 notEmpty_(mutex_),//再用互斥量初始化信号了 queue_() { } void put(const T& x)//生产数据 { MutexLockGuard lock(mutex_); queue_.push_back(x); notEmpty_.notify(); // wait morphing saves us // http://www.domaigne.com/blog/computing/condvars-signal-with-mutex-locked-or-not/ } #ifdef __GXX_EXPERIMENTAL_CXX0X__ void put(T&& x)//右值 { MutexLockGuard lock(mutex_); queue_.push_back(std::move(x)); notEmpty_.notify(); } // FIXME: emplace() #endif T take()//消费数据 { MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup while (queue_.empty())//仓库已空 { notEmpty_.wait();//等待生产者信号 } assert(!queue_.empty()); #ifdef __GXX_EXPERIMENTAL_CXX0X__ T front(std::move(queue_.front())); #else T front(queue_.front()); #endif queue_.pop_front(); return front; } size_t size() const { MutexLockGuard lock(mutex_); return queue_.size(); } private: mutable MutexLock mutex_;//互斥量 Condition notEmpty_;//信号量 std::deque<T> queue_;//仓库 };
可以写个生产者消费者测试代码,有了这个BlockingQueue,不用自己在管理互斥量和信号量了。
#include <muduo/base/BlockingQueue.h> #include <muduo/base/Thread.h> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <iostream> using namespace muduo; using namespace boost; void Produce(shared_ptr<BlockingQueue<int> > queue) { while(true) { int product=rand()%1000+1; std::cout<<"Produce: "<<product<<std::endl; queue->put(product); sleep(rand()%5); } } void Consome(shared_ptr<BlockingQueue<int> > queue) { while(true) { int product=queue->take(); std::cout<<"Consome: "<<product<<std::endl; } } int main() { shared_ptr<BlockingQueue<int> > blockingQueue(new BlockingQueue<int>); Thread t1(boost::bind(Produce, blockingQueue)); Thread t2(boost::bind(Consome, blockingQueue)); t1.start(); t2.start(); t1.join(); t2.join(); return 0; }
BoundedBlockingQueue
与BlockingQueue不同,BoundedBlockingQueue是有边界的,即仓库是有限的。这时仓库有四个状态:非空、已空;非满、已满。当生产者生产时,会先判断仓库是否已满,如果是则等待仓库非满的信号;否则则向仓库添加货物,之后通知消费者仓库非空。
当消费者取货物时会先判断仓库是否为空,如果是则等待仓库非空信号;否则取走货物,通知生产者仓库非满。
这时候实现生产者消费者模型时需要2个信号量,一个是非空,表示消费者可以消费了;一个是非满,表示生产者可以生产了。源码如下:
template<typename T> class BoundedBlockingQueue : boost::noncopyable { public: explicit BoundedBlockingQueue(int maxSize)//最大容量 : mutex_(), notEmpty_(mutex_), notFull_(mutex_), queue_(maxSize) { } void put(const T& x) { MutexLockGuard lock(mutex_); while (queue_.full())//仓库已满 { notFull_.wait();//等待非满信号,即消费者消费后会通知 } assert(!queue_.full()); queue_.push_back(x); notEmpty_.notify();//通知消费者仓库已经有货(非空) } T take() { MutexLockGuard lock(mutex_); while (queue_.empty())//仓库已空 { notEmpty_.wait();//等待生产者向仓库添加货物 } assert(!queue_.empty()); T front(queue_.front()); queue_.pop_front(); notFull_.notify();//通知生产者仓库已经非空了 return front; } bool empty() const { MutexLockGuard lock(mutex_); return queue_.empty(); } bool full() const { MutexLockGuard lock(mutex_); return queue_.full(); } size_t size() const { MutexLockGuard lock(mutex_); return queue_.size(); } size_t capacity() const { MutexLockGuard lock(mutex_); return queue_.capacity(); } private: mutable MutexLock mutex_; Condition notEmpty_;//非空信号量 Condition notFull_;//非满信号量 boost::circular_buffer<T> queue_; };
测试代码:
//boundedBlokcingQueue.cpp #include <muduo/base/BoundedBlockingQueue.h> #include <muduo/base/Thread.h> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <iostream> using namespace muduo; using namespace boost; void Produce(shared_ptr<BoundedBlockingQueue<int> > queue) { while(true) { int product=rand()%1000+1; std::cout<<"Produce: "<<product<<std::endl; queue->put(product); sleep(rand()%5); } } void Consome(shared_ptr<BoundedBlockingQueue<int> > queue) { while(true) { int product=queue->take(); std::cout<<"Consome: "<<product<<std::endl; sleep(rand()%5); } } int main() { shared_ptr<BoundedBlockingQueue<int> > boundedBlockingQueue(new BoundedBlockingQueue<int>(5)); Thread t1(boost::bind(Produce, boundedBlockingQueue)); Thread t2(boost::bind(Consome, boundedBlockingQueue)); t1.start(); t2.start(); t1.join(); t2.join(); return 0; }
相关文章推荐
- SPOJ 1771 NQUEEN Yet Another N-Queen Problem
- 通用Key-Value存储系统的存储管理策略解析
- Architecture,Valid architectures,Build Active Architecture Only
- 配置文件读取
- UISearchBar 点击取消回到原来位置时会跳动的解决方法
- Maximum Subarray
- Qt 5.3 下OpenCV 2.4.11 开发(3)简单的GUI项目
- Daikon Forge GUI Library(dfgui)之Scroll Panel
- [UIScreen mainScreen].bounds.size获取设备长宽时的问题
- UILable 设置对齐方式扩展
- 用户选择一个value过滤数据
- 初学miniui之miniui的使用
- easyui获取datagrid所有行、变化行、更新行等数据的方法
- Result Maps collection already contains value for
- UIAlertView,POP返回键盘闪现
- Android学习心得(9) --- ndk-build脚本参数用法
- Easyui 中的placeholder属性
- iOS UIButton按钮图片在上文字在下
- UI 总结-1 UIButton
- iOS UI第一周总结