您的位置:首页 > 编程语言 > C语言/C++

生产者消费者问题——C++ windows版 多生产者多消费者的队列实现

2014-06-09 23:51 561 查看
  最进要写一个多线程加载资源的资源管理器(ResourceManager)和多线程音频解码器(MultiThread Decoder)。因为距最近一次用到多线程放下好久了,所以今天把生产者消费者问题练一下手。

  为什么选择生产者消费者问题,因为他比较接近资源管理器和多线程音频解码器的原型。

  比如,对于音频解码器,音频线程去流式的解码一段MP3格式的内存,就类似生产者生产产品的过程;而音频播放API(如OpenAL,OpenSL)通常需要的是PCM数据,也就是生产者生产的产品,所以播放逻辑充当消费者的角色,典型的生产者消费者问题。

  再对于资源管理器,加载Mesh和Texture类似生产单个Resource的过程,而相应的渲染逻辑去使用资源就相当于消费资源的过程,但不同的是最后当不再使用这个资源的时候,这个资源才会被释放,而非使用一次。

  今天抽时间,写了一个C++ windows版 多生产者多消费者的队列实现,pthread版估计等需要做跨平台的时候再做,如果想方便直接用OpenMP或者TBB也是可以的,但是对于轻量级引擎,自己实现资源加载器完全足够了。

  

#include <Windows.h>
#include <map>
#include <queue>

CRITICAL_SECTION g_cs;    // mutex

HANDLE    g_hEmptyBufferSemaphore;
HANDLE  g_hFullBufferSemaphore;

#define INVALID  -1

#define PRODUCER 5
#define CUSTOMER 5

#define NUM_COUNT 8
#define BUFF_SIZE 4

static std::queue<int> bufferQueue;
static std::map< DWORD,int > countMap;

bool ProducerFinish = false;

//设置控制台输出颜色
BOOL SetConsoleColor(WORD wAttributes)
{
HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
if (hConsole == INVALID_HANDLE_VALUE)
return FALSE;
return SetConsoleTextAttribute(hConsole, wAttributes);
}

DWORD WINAPI ProducerFunc(LPVOID param)
{

while(true)
{
WaitForSingleObject( g_hEmptyBufferSemaphore, INFINITE);

EnterCriticalSection(&g_cs);

DWORD threadId = GetCurrentThreadId();
if(countMap.find(threadId) == countMap.end())
countMap[threadId] = 0;

int productID = ++countMap[threadId];

bufferQueue.push( productID);
printf("生产者%d , 生产%d\n", threadId, productID);

if( productID == NUM_COUNT )
{
SetConsoleColor(FOREGROUND_RED);
printf("生产者%d生产完毕\n",GetCurrentThreadId());
SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);

LeaveCriticalSection(&g_cs);
ReleaseSemaphore(g_hFullBufferSemaphore, 1, NULL);

break;
}

LeaveCriticalSection(&g_cs);

ReleaseSemaphore(g_hFullBufferSemaphore, 1, NULL);
}

return NULL;
}

DWORD WINAPI CustomerFunc(LPVOID param)
{
while (true)
{
WaitForSingleObject( g_hFullBufferSemaphore, INFINITE);

EnterCriticalSection(&g_cs);

int buffer = -1;

if(!bufferQueue.empty())
{
buffer = bufferQueue.front();
bufferQueue.pop();
}

if(buffer != INVALID )
printf("消费者%d ,消费%d\n", GetCurrentThreadId() , buffer);

if( bufferQueue.empty() && ProducerFinish)
{
printf("消费者%d 结束\n",GetCurrentThreadId());
LeaveCriticalSection(&g_cs);

// 通知其他消费者可以结束了
ReleaseSemaphore( g_hFullBufferSemaphore, 1, NULL);
break;
}

LeaveCriticalSection(&g_cs);

ReleaseSemaphore( g_hEmptyBufferSemaphore, 1, NULL);
}

return NULL;
}

int _tmain(int argc, _TCHAR* argv[])
{
InitializeCriticalSection(&g_cs);

g_hEmptyBufferSemaphore = CreateSemaphore( NULL, 4, 4, NULL);
g_hFullBufferSemaphore = CreateSemaphore( NULL, 0, 4, NULL);

HANDLE producerThreads[PRODUCER];
HANDLE customerThreads[CUSTOMER];

// producer
for (int i = 0; i < PRODUCER; ++i)
{
producerThreads[i] = CreateThread( NULL, 0, ProducerFunc, NULL, NULL, NULL);
}

// customers
for (int i = 0; i < CUSTOMER; ++i)
customerThreads[i] = CreateThread( NULL, 0, CustomerFunc, NULL, NULL, NULL);

WaitForMultipleObjects( PRODUCER, producerThreads, TRUE, INFINITE);
ProducerFinish = true;

WaitForMultipleObjects( CUSTOMER, customerThreads, TRUE, INFINITE);

for (int i = 0; i < PRODUCER; ++i)
CloseHandle(producerThreads[i]);

for (int i = 0; i < CUSTOMER; ++i)
CloseHandle(customerThreads[i]);

CloseHandle(g_hEmptyBufferSemaphore);
CloseHandle(g_hFullBufferSemaphore);

DeleteCriticalSection(&g_cs);

countMap.clear();

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: