生产者-消费者模型的解决思路——自建队列
2004-09-02 13:41
211 查看
笔者曾遇到这样的需求:某软件在运行中随时有可能向外发送短信,一方面发送短信的设备是个独占资源,另一方面有多个线程要发短信。按照“把不稳定因素限定在一个实体中”的原则,自然就用一个专门的线程来操作短信设备,它的任务是从消息队列中取出要发送的短信,通过短信设备发送出去。
这是比较常见的生产者-消费者模型,即一个模块产生数据,另外模块取得数据并进行处理。如何实现互斥?如何让生产者和消费者都能够方便的工作?
类似这样的情况在编程中十分常见,采用消息队列是很好的解决办法。本文给出的消息队列具有以下特色:
·消息的大小、结构是自由的,甚至可以是一个对象;
·消息队列的长度(容纳消息的个数)是可设定的,超过最大长度可以选择丢弃或等待;
·新消息一般是放在队列的最后面,也可以放在最前面;
·代码少,思路简洁,你可以根据情况扩展;
【设计思路】
设计思路其实很简单,消息队列类CHMQ是个模板类,用CList保存消息,用CriticalSection来实现互斥,用Event来控制消息队列为空或已满的情况。提供以下几个常用接口:
·Push() 在队列尾增加消息,如果消息队列已满,根据设定或丢弃消息,或阻塞直到队列不为满的时候。
·Insert() 在对列头增加消息,其他同Push()。
·Pop() 从队列头中取消息,如果队列为空则阻塞,直到不为空的时候。
·SetMaxCount() 设置消息队列的最大长度,以及如果消息队列满的时候新消息的处理方式。 或丢弃消息,或阻塞直到队列不为满的时候。
·GetMaxCount() 返回消息队列的最大长度,如果没有设置最大长度则返回-1。
·GetQueueCount() 返回目前消息队列中的消息个数。
【扩展】
(1) 目前一个CHMQ对象仅支持一个消息队列,可以扩展成多个消息队列
(2) 消息队列为空时,Pop()一直阻塞到有新消息时才返回,可以扩展成超时返回
(3) 消息队列已满时,Push()和Insert()可能会阻塞到队列不满的时候,可以扩展成超时丢弃
#ifndef CHMQ_H
#define CHMQ_H
#include
#include
template
class CHMQ
{
public:
CHMQ()
{
m_nMax = -1;
m_bDrop = FALSE;
::InitializeCriticalSection(&m_lock);
m_hPushEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
ASSERT(m_hPushEvent != NULL);
m_hPopEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
ASSERT(m_hPopEvent != NULL);
}
~CHMQ()
{
::DeleteCriticalSection(&m_lock);
::CloseHandle(m_hPushEvent);
::CloseHandle(m_hPopEvent);
m_list.RemoveAll();
}
// 队列尾增加消息,如果消息队列已满,根据设定或丢弃消息,或阻塞直到队列不为满的时候。
void Push(TYPE& type)
{
::EnterCriticalSection(&m_lock);
// 如果消息队列已满
if ( m_nMax > 0 && m_list.GetCount() >= m_nMax)
{
::ResetEvent(m_hPushEvent);
::LeaveCriticalSection(&m_lock);
if( m_bDrop )
return;
if ( ::WaitForSingleObject(m_hPushEvent, INFINITE) != WAIT_OBJECT_0)
ASSERT(FALSE);
::EnterCriticalSection(&m_lock);
}
m_list.AddTail(type);
::SetEvent(m_hPopEvent);
::LeaveCriticalSection(&m_lock);
}
// 队列头增加消息,如果消息队列已满,根据设定或丢弃消息,或阻塞直到队列不为满的时候。
void Insert(TYPE& type)
{
::EnterCriticalSection(&m_lock);
// 如果消息队列已满
if ( m_nMax > 0 && m_list.GetCount() >= m_nMax)
{
::ResetEvent(m_hPushEvent);
::LeaveCriticalSection(&m_lock);
if( m_bDrop )
return;
if ( ::WaitForSingleObject(m_hPushEvent, INFINITE) != WAIT_OBJECT_0)
ASSERT(FALSE);
::EnterCriticalSection(&m_lock);
}
m_list.AddHead(type);
::SetEvent(m_hPopEvent);
::LeaveCriticalSection(&m_lock);
}
// 从队列头中取消息,如果队列为空则阻塞,直到不为空的时候。
TYPE Pop()
{
TYPE type;
::EnterCriticalSection(&m_lock);
// 如果队列为空
if (m_list.IsEmpty())
{
::ResetEvent(m_hPopEvent);
::LeaveCriticalSection(&m_lock);
if ( ::WaitForSingleObject(m_hPopEvent, INFINITE) != WAIT_OBJECT_0)
ASSERT(FALSE);
::EnterCriticalSection(&m_lock);
}
type = m_list.RemoveHead();
::SetEvent(m_hPushEvent);
::LeaveCriticalSection(&m_lock);
return type;
}
// 返回目前消息队列中的消息个数。
int GetQueueCount()
{
int nCount = 0;
::EnterCriticalSection(&m_lock);
nCount = m_list.GetCount();
::LeaveCriticalSection(&m_lock);
return nCount;
}
// 设置消息队列的最大长度,以及如果消息队列满的时候新消息的处理方式。
// 或丢弃消息,或阻塞直到队列不为满的时候。
void SetMaxCount(int nMax = -1, BOOL bDrop = FALSE)
{
::EnterCriticalSection(&m_lock);
m_nMax = nMax;
m_bDrop = bDrop;
::LeaveCriticalSection(&m_lock);
}
// 返回消息队列的最大长度,如果没有设置最大长度则返回-1。
int GetMaxCount()
{
::EnterCriticalSection(&m_lock);
::LeaveCriticalSection(&m_lock);
return m_nMax;
}
private:
CRITICAL_SECTION m_lock;
CList m_list;
HANDLE m_hPopEvent;
HANDLE m_hPushEvent;
int m_nMax;
BOOL m_bDrop;
};
#endif // CHMQ_H
这是比较常见的生产者-消费者模型,即一个模块产生数据,另外模块取得数据并进行处理。如何实现互斥?如何让生产者和消费者都能够方便的工作?
类似这样的情况在编程中十分常见,采用消息队列是很好的解决办法。本文给出的消息队列具有以下特色:
·消息的大小、结构是自由的,甚至可以是一个对象;
·消息队列的长度(容纳消息的个数)是可设定的,超过最大长度可以选择丢弃或等待;
·新消息一般是放在队列的最后面,也可以放在最前面;
·代码少,思路简洁,你可以根据情况扩展;
【设计思路】
设计思路其实很简单,消息队列类CHMQ是个模板类,用CList保存消息,用CriticalSection来实现互斥,用Event来控制消息队列为空或已满的情况。提供以下几个常用接口:
·Push() 在队列尾增加消息,如果消息队列已满,根据设定或丢弃消息,或阻塞直到队列不为满的时候。
·Insert() 在对列头增加消息,其他同Push()。
·Pop() 从队列头中取消息,如果队列为空则阻塞,直到不为空的时候。
·SetMaxCount() 设置消息队列的最大长度,以及如果消息队列满的时候新消息的处理方式。 或丢弃消息,或阻塞直到队列不为满的时候。
·GetMaxCount() 返回消息队列的最大长度,如果没有设置最大长度则返回-1。
·GetQueueCount() 返回目前消息队列中的消息个数。
【扩展】
(1) 目前一个CHMQ对象仅支持一个消息队列,可以扩展成多个消息队列
(2) 消息队列为空时,Pop()一直阻塞到有新消息时才返回,可以扩展成超时返回
(3) 消息队列已满时,Push()和Insert()可能会阻塞到队列不满的时候,可以扩展成超时丢弃
#ifndef CHMQ_H
#define CHMQ_H
#include
#include
template
class CHMQ
{
public:
CHMQ()
{
m_nMax = -1;
m_bDrop = FALSE;
::InitializeCriticalSection(&m_lock);
m_hPushEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
ASSERT(m_hPushEvent != NULL);
m_hPopEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
ASSERT(m_hPopEvent != NULL);
}
~CHMQ()
{
::DeleteCriticalSection(&m_lock);
::CloseHandle(m_hPushEvent);
::CloseHandle(m_hPopEvent);
m_list.RemoveAll();
}
// 队列尾增加消息,如果消息队列已满,根据设定或丢弃消息,或阻塞直到队列不为满的时候。
void Push(TYPE& type)
{
::EnterCriticalSection(&m_lock);
// 如果消息队列已满
if ( m_nMax > 0 && m_list.GetCount() >= m_nMax)
{
::ResetEvent(m_hPushEvent);
::LeaveCriticalSection(&m_lock);
if( m_bDrop )
return;
if ( ::WaitForSingleObject(m_hPushEvent, INFINITE) != WAIT_OBJECT_0)
ASSERT(FALSE);
::EnterCriticalSection(&m_lock);
}
m_list.AddTail(type);
::SetEvent(m_hPopEvent);
::LeaveCriticalSection(&m_lock);
}
// 队列头增加消息,如果消息队列已满,根据设定或丢弃消息,或阻塞直到队列不为满的时候。
void Insert(TYPE& type)
{
::EnterCriticalSection(&m_lock);
// 如果消息队列已满
if ( m_nMax > 0 && m_list.GetCount() >= m_nMax)
{
::ResetEvent(m_hPushEvent);
::LeaveCriticalSection(&m_lock);
if( m_bDrop )
return;
if ( ::WaitForSingleObject(m_hPushEvent, INFINITE) != WAIT_OBJECT_0)
ASSERT(FALSE);
::EnterCriticalSection(&m_lock);
}
m_list.AddHead(type);
::SetEvent(m_hPopEvent);
::LeaveCriticalSection(&m_lock);
}
// 从队列头中取消息,如果队列为空则阻塞,直到不为空的时候。
TYPE Pop()
{
TYPE type;
::EnterCriticalSection(&m_lock);
// 如果队列为空
if (m_list.IsEmpty())
{
::ResetEvent(m_hPopEvent);
::LeaveCriticalSection(&m_lock);
if ( ::WaitForSingleObject(m_hPopEvent, INFINITE) != WAIT_OBJECT_0)
ASSERT(FALSE);
::EnterCriticalSection(&m_lock);
}
type = m_list.RemoveHead();
::SetEvent(m_hPushEvent);
::LeaveCriticalSection(&m_lock);
return type;
}
// 返回目前消息队列中的消息个数。
int GetQueueCount()
{
int nCount = 0;
::EnterCriticalSection(&m_lock);
nCount = m_list.GetCount();
::LeaveCriticalSection(&m_lock);
return nCount;
}
// 设置消息队列的最大长度,以及如果消息队列满的时候新消息的处理方式。
// 或丢弃消息,或阻塞直到队列不为满的时候。
void SetMaxCount(int nMax = -1, BOOL bDrop = FALSE)
{
::EnterCriticalSection(&m_lock);
m_nMax = nMax;
m_bDrop = bDrop;
::LeaveCriticalSection(&m_lock);
}
// 返回消息队列的最大长度,如果没有设置最大长度则返回-1。
int GetMaxCount()
{
::EnterCriticalSection(&m_lock);
::LeaveCriticalSection(&m_lock);
return m_nMax;
}
private:
CRITICAL_SECTION m_lock;
CList m_list;
HANDLE m_hPopEvent;
HANDLE m_hPushEvent;
int m_nMax;
BOOL m_bDrop;
};
#endif // CHMQ_H
相关文章推荐
- java中使用阻塞队列解决生产者消费者问题
- 11.python并发入门(part8 基于线程队列实现生产者消费者模型)
- 线程/GIL/线程锁/信号量/守护进程/Event事件/queue队列/生产者消费者模型
- 生产者/消费者模型改进版 ——队列
- Java多线程之~~~使用Exchanger在线程之间交换数据[这个结合多线程并行会有解决很多问题]生产者消费者模型
- 什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型?
- 基于单链表和环形队列的生产者-消费者模型
- (新思路)生产者消费者模型(condition的解决方法)
- 人生苦短之我用Python篇(队列、生产者和消费者模型)
- 多线程+阻塞队列实现生产者-消费者模型获取队列数据问题
- 微软100题(34)生产者消费者模型实现队列
- 【Java并发】阻塞队列BlockingQueue和生产者-消费者模型笔记
- 生产者/消费者模式(阻塞队列) 一个类似于监听者模式的并发模型
- (新思路)生产者消费者模型2(BlockingQueue的无锁解决方法)
- 线程池 队列生产者消费者模型实现
- 用消息队列模拟生产者消费者模型
- 生产者/消费者模式(阻塞队列) 一个经典的并发模型
- 使用阻塞队列解决生产者-消费者问题
- android 消费者和生产者的队列处理模型
- 经典的生产者与消费者模型(基于BlockingQueue队列实现)