muduo源码分析:无界队列和有界队列(消费者-生产者)
2015-12-09 17:11
344 查看
muduo库的队列很简单:
take--消费
size--返回现在的产品数
产品存放在deque里面
无界队列可以一直生产put,所以不用等待队列中有位置才生产,每生产一个产品就notify,只要通知一个消费者,不需要通知所有人
只有消费take时要判断有无产品,所以只需要一个mutex锁,一个条件变量notEmpty,消费者用来等待notEmpty有产品可以消费
因为有界,所以生产的时候要等待有位置才能生产--用条件变量notFull
消费时用notEmpty条件变量等待有产品可以消费。
生产者和消费者处理的资源都是同一个queue_,所以是两个条件变量(一个等待非空,一个等待非满),但是只要一把mutex_锁,两个条件变量共用一把锁
如果不用boost的circular_buffer。可以用数组实现,putIndex=0 将要生产的位置;takeInex=0 将要读的位置,isFull=false 产品是否满了 , isEmpty=ture 产品空了;
put( ){
lock( );
produce;//putIndex=0,刚开始直接生产没问题,但是把wait写后面在(2)的位置就错了---------------(1)
notify(notEmpty);
putIIndex++;
putIndex%=maxSize;
isEmpty=fase;
if(putIndex==takeIndex) //没有位置可以生产了
{
isFull=true;
wait(notFull); //不可以,错了---------------------------------------(2)
}
}
错误1:put操作,把产品都已经生产了加进去了,但是却还阻塞了,不返回。不符合常规逻辑
错误2:第一次 putIndex=0,刚开始直接生产没问题(加了锁,多个线程也没问题,是串行的)。但是后面有问题:前一个线程刚把队列填满了,自己阻塞了,但其他线程却没有判断就直接生产了,而事实上队列已经满了。所以应该要下面这样:
put( ){
lock( );
while( isFull ) // 没有位置可以生产了
{
wait(notFull);
}
produce;
isEmpty=fase;
putIIndex++;
putIndex%=maxSize;
if(putIndex==takeIndex) //满了
isFull=true;
notify(notEmpty);
}
take()
{
lock( );
while(isEmpty)
wait(notEmpty);//不能直接消费
cosumer;
isFull=false;
takeIndex++;
takeIndex%=maxSize;
if(putIndex==takeIndex) //没有产品消费了,空了
isEmpty=true;
notify(notFull);
}
参考:c++教程网
muduo网络库
linux多线程服务器端编程》.陈硕
无界队列
put --生产take--消费
size--返回现在的产品数
产品存放在deque里面
无界队列可以一直生产put,所以不用等待队列中有位置才生产,每生产一个产品就notify,只要通知一个消费者,不需要通知所有人
只有消费take时要判断有无产品,所以只需要一个mutex锁,一个条件变量notEmpty,消费者用来等待notEmpty有产品可以消费
#ifndef MUDUO_BASE_BLOCKINGQUEUE_H #define MUDUO_BASE_BLOCKINGQUEUE_H #include <muduo/base/Condition.h> #include <muduo/base/Mutex.h> #include <boost/noncopyable.hpp> #include <deque> #include <assert.h> namespace muduo { template<typename T> class BlockingQueue : boost::noncopyable { public: explicit BlockingQueue() :mutex_(), notEmptyCond_(mutex_), queue_() {} void put(const T& t) { { MutexLockGuard lock(mutex_); queue_.push_back(t); } notEmptyCond_.notify();//不需要notifyAll通知所有的,如果有阻塞,代表原先为空,现在只生产了一个,也只要一个消费来就足够了 } T take() { MutexLockGuard lock(mutex_); while(queue_.empty()) notEmptyCond_.wait(); assert(!queue_.empty()); T front=queue_.front(); queue_.pop_front(); return front; } size_t size() const { MutexLockGuard lock(mutex_); return queue_.size(); } private: mutable MutexLock mutex_; Condition notEmptyCond_; std::deque<T> queue_; }; } #endif
有界队列
有界队列用boost::circular_buffer<T> queue_实现因为有界,所以生产的时候要等待有位置才能生产--用条件变量notFull
消费时用notEmpty条件变量等待有产品可以消费。
生产者和消费者处理的资源都是同一个queue_,所以是两个条件变量(一个等待非空,一个等待非满),但是只要一把mutex_锁,两个条件变量共用一把锁
// Use of this source code is governed by a BSD-style license // that can be found in the License file. // // Author: Shuo Chen (chenshuo at chenshuo dot com) #ifndef MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H #define MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H #include <muduo/base/Condition.h> #include <muduo/base/Mutex.h> #include <boost/circular_buffer.hpp> #include <boost/noncopyable.hpp> #include <assert.h> namespace muduo { template<typename T> class BoundedBlockingQueue : boost::noncopyable { public: explicit BoundedBlockingQueue(int maxSize) : mutex_(), notEmpty_(mutex_), notFull_(mutex_), queue_(maxSize) { } void put(const T& x) { MutexLockGuard lock(mutex_); while (queue_.full()) { notFull_.wait(); } assert(!queue_.full()); queue_.push_back(x); notEmpty_.notify(); // TODO: move outside of lock } T take() { MutexLockGuard lock(mutex_); while (queue_.empty()) { notEmpty_.wait(); } assert(!queue_.empty()); T front(queue_.front()); queue_.pop_front(); notFull_.notify(); // TODO: move outside of lock return front; } bool empty() const { MutexLockGuard lock(mutex_); return queue_.empty(); } bool full() const { MutexLockGuard lock(mutex_); return queue_.full(); } size_t size() const { MutexLockGuard lock(mutex_); return queue_.size(); } size_t capacity() const { MutexLockGuard lock(mutex_); return queue_.capacity(); } private: mutable MutexLock mutex_;//一把锁,两个条件变量共用一把锁 Condition notEmpty_; Condition notFull_; boost::circular_buffer<T> queue_; }; } #endif // MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
如果不用boost的circular_buffer。可以用数组实现,putIndex=0 将要生产的位置;takeInex=0 将要读的位置,isFull=false 产品是否满了 , isEmpty=ture 产品空了;
put( ){
lock( );
produce;//putIndex=0,刚开始直接生产没问题,但是把wait写后面在(2)的位置就错了---------------(1)
notify(notEmpty);
putIIndex++;
putIndex%=maxSize;
isEmpty=fase;
if(putIndex==takeIndex) //没有位置可以生产了
{
isFull=true;
wait(notFull); //不可以,错了---------------------------------------(2)
}
}
错误1:put操作,把产品都已经生产了加进去了,但是却还阻塞了,不返回。不符合常规逻辑
错误2:第一次 putIndex=0,刚开始直接生产没问题(加了锁,多个线程也没问题,是串行的)。但是后面有问题:前一个线程刚把队列填满了,自己阻塞了,但其他线程却没有判断就直接生产了,而事实上队列已经满了。所以应该要下面这样:
put( ){
lock( );
while( isFull ) // 没有位置可以生产了
{
wait(notFull);
}
produce;
isEmpty=fase;
putIIndex++;
putIndex%=maxSize;
if(putIndex==takeIndex) //满了
isFull=true;
notify(notEmpty);
}
take()
{
lock( );
while(isEmpty)
wait(notEmpty);//不能直接消费
cosumer;
isFull=false;
takeIndex++;
takeIndex%=maxSize;
if(putIndex==takeIndex) //没有产品消费了,空了
isEmpty=true;
notify(notFull);
}
参考:c++教程网
muduo网络库
linux多线程服务器端编程》.陈硕
相关文章推荐
- UART0串口编程系列 串口编程(UART0)之中断方式(一)
- iOS 分享功能开发
- 常见的七大排序算法Java实现
- Oracle 10g
- 模板字符串
- iOS 获取 UITabViewController 和 UINavigationController 的图标位置
- 179. Largest Number(INT, String)
- underscore api 概览
- Spring 配置中的 default-lazy-init="false"
- 最简单的matplotlib安装和使用
- 最简单的matplotlib安装和使用
- Data Structure(1-6)---单链表的整表创建
- gdal 图像金字塔
- 【BZOJ 1001】狼抓兔子 对偶图+SPFA
- ***XAMPP:报错 Unable to load dynamic library的解决方法
- MapReduce计数器
- 谈谈java中的WeakReference
- ES6的全新特性:模板字符串
- 树莓派python,tornado,返回json数据代码,初级网络编程。
- appcompat_v7 dont R linux ubuntu