您的位置:首页 > 产品设计 > UI/UE

Windows下一个并发阻塞队列(BlockingQueue)

2014-03-26 17:06 399 查看
Windows下一个带有大小限制的并发阻塞队列,实现的比较简单。

[cpp] view
plaincopy

#ifndef BLOCKINGQUEUE_H_

#define BLOCKINGQUEUE_H_

#include <queue>

#include <windows.h>

using namespace std;

template <typename T>

class BoundedBlockingQueue

{

public:

BoundedBlockingQueue(int size) : maxSize(size)

{

_lock = CreateMutex(NULL,false,NULL);

_rsem = CreateSemaphore(NULL,0,size,NULL);

_wsem = CreateSemaphore(NULL,size,size,NULL);

}

~BoundedBlockingQueue()

{

CloseHandle(_lock);

CloseHandle(_rsem);

CloseHandle(_wsem);

}

void push(const T& data);

T pop();

bool empty()

{

WaitForSingleObject(_lock,INFINITE);

bool is_empty = _array.empty();

ReleaseMutex(_lock);

return is_empty;

}

private:

deque<T> _array;

int maxSize;

HANDLE _lock;

HANDLE _rsem, _wsem;

};

template <typename T>

void BoundedBlockingQueue <T>::push(const T& value )

{

WaitForSingleObject(_wsem,INFINITE);

WaitForSingleObject(_lock,INFINITE);

_array.push_back(value);

ReleaseMutex(_lock);

ReleaseSemaphore(_rsem,1,NULL);

}

template <typename T>

T BoundedBlockingQueue<T>::pop()

{

WaitForSingleObject(_rsem,INFINITE);

WaitForSingleObject(_lock,INFINITE);

T _temp = _array.front();

_array.pop_front();

ReleaseMutex(_lock);

ReleaseSemaphore(_wsem,1,NULL);

return _temp;

}

#endif

主函数调用测试:一个生产者、两个消费者使用这个队列进行测试。

[cpp] view
plaincopy

#include "BlockingQueue.h"

#include <windows.h>

#include <iostream>

using namespace std;

bool is_over = false;

DWORD WINAPI produce(LPVOID lppara)

{

BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;

while(1)

{

for(int i=1; i<=50; ++i)

{

queue->push(i);

cout<<GetCurrentThreadId()<<" put a data: "<<i<<endl;

Sleep(10); //producer is fast

}

is_over = true;

break;

}

return NULL;

}

DWORD WINAPI consume(LPVOID lppara)

{

BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;

while(1)

{

int d = queue->pop();

cout<<GetCurrentThreadId()<<" get data: "<<d<<endl;

//double check

if(is_over && queue->empty())

{

cout<<"OVER!"<<endl;

break;

}

Sleep(10); //consumer is slow

}

return NULL;

}

int main()

{

DWORD write_data;

DWORD read_data;

DWORD read_data1;

BoundedBlockingQueue<int> queue(20);

//一个生产者、两个消费者

if(CreateThread(NULL,0,produce,&queue,0,&write_data)==NULL)

return -1;

if(CreateThread(NULL,0,consume,&queue,0,&read_data)==NULL)

return -1;

if(CreateThread(NULL,0,consume,&queue,0,&read_data1)==NULL)

return -1;

char ch;

while(1)

{

ch = getchar(); //press "e" to exit

if(ch == 'e') break;

}

printf("Program ends successfully\n");

return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐