您的位置:首页 > 其它

muduo源码分析:无界队列和有界队列(消费者-生产者)

2015-12-09 17:11 344 查看
muduo库的队列很简单:

无界队列

put --生产

take--消费

size--返回现在的产品数

产品存放在deque里面

无界队列可以一直生产put,所以不用等待队列中有位置才生产,每生产一个产品就notify,只要通知一个消费者,不需要通知所有人

只有消费take时要判断有无产品,所以只需要一个mutex锁,一个条件变量notEmpty,消费者用来等待notEmpty有产品可以消费

#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H

#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>

#include <boost/noncopyable.hpp>
#include <deque>
#include <assert.h>

namespace muduo
{
template<typename T>
class BlockingQueue : boost::noncopyable
{
public:
explicit BlockingQueue()
:mutex_(),
notEmptyCond_(mutex_),
queue_()
{}
void put(const T& t)
{
{
MutexLockGuard lock(mutex_);
queue_.push_back(t);
}
notEmptyCond_.notify();//不需要notifyAll通知所有的,如果有阻塞,代表原先为空,现在只生产了一个,也只要一个消费来就足够了
}
T take()
{
MutexLockGuard lock(mutex_);
while(queue_.empty())
notEmptyCond_.wait();
assert(!queue_.empty());
T front=queue_.front();
queue_.pop_front();
return front;
}
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}

private:
mutable MutexLock mutex_;
Condition notEmptyCond_;
std::deque<T> queue_;
};

}

#endif


有界队列

有界队列用boost::circular_buffer<T> queue_实现
因为有界,所以生产的时候要等待有位置才能生产--用条件变量notFull
消费时用notEmpty条件变量等待有产品可以消费。
生产者和消费者处理的资源都是同一个queue_,所以是两个条件变量(一个等待非空,一个等待非满),但是只要一把mutex_锁,两个条件变量共用一把锁
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#define MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H

#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>

#include <boost/circular_buffer.hpp>
#include <boost/noncopyable.hpp>
#include <assert.h>

namespace muduo
{

template<typename T>
class BoundedBlockingQueue : boost::noncopyable
{
public:
explicit BoundedBlockingQueue(int maxSize)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
queue_(maxSize)
{
}

void put(const T& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(x);
notEmpty_.notify(); // TODO: move outside of lock
}

T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty())
{
notEmpty_.wait();
}
assert(!queue_.empty());
T front(queue_.front());
queue_.pop_front();
notFull_.notify(); // TODO: move outside of lock
return front;
}

bool empty() const
{
MutexLockGuard lock(mutex_);
return queue_.empty();
}

bool full() const
{
MutexLockGuard lock(mutex_);
return queue_.full();
}

size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}

size_t capacity() const
{
MutexLockGuard lock(mutex_);
return queue_.capacity();
}

private:
mutable MutexLock          mutex_;//一把锁,两个条件变量共用一把锁
Condition                  notEmpty_;
Condition                  notFull_;
boost::circular_buffer<T>  queue_;
};

}

#endif  // MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H


如果不用boost的circular_buffer。可以用数组实现,putIndex=0 将要生产的位置;takeInex=0 将要读的位置,isFull=false 产品是否满了 , isEmpty=ture 产品空了
put( ){
lock( );
produce;//putIndex=0,刚开始直接生产没问题,但是把wait写后面在(2)的位置就错了---------------(1)
notify(notEmpty);
putIIndex++;
putIndex%=maxSize;
isEmpty=fase;
if(putIndex==takeIndex) //没有位置可以生产了
{
isFull=true;
wait(notFull); //不可以,错了---------------------------------------(2)
}
}
错误1:put操作,把产品都已经生产了加进去了,但是却还阻塞了,不返回。不符合常规逻辑
错误2:第一次 putIndex=0,刚开始直接生产没问题(加了锁,多个线程也没问题,是串行的)。但是后面有问题:前一个线程刚把队列填满了,自己阻塞了,但其他线程却没有判断就直接生产了,而事实上队列已经满了。所以应该要下面这样:

put( ){
lock( );

while( isFull ) // 没有位置可以生产了
{
wait(notFull);
}

produce;
isEmpty=fase;
putIIndex++;
putIndex%=maxSize;
if(putIndex==takeIndex) //满了
isFull=true;
notify(notEmpty);
}

take()
{
lock( );
while(isEmpty)
wait(notEmpty);//不能直接消费
cosumer;
isFull=false;
takeIndex++;
takeIndex%=maxSize;

if(putIndex==takeIndex) //没有产品消费了,空了
isEmpty=true;

notify(notFull);
}

参考:c++教程网

muduo网络库

linux多线程服务器端编程》.陈硕
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: