Windows下一个并发阻塞队列(BlockingQueue)
2014-03-26 17:06
399 查看
Windows下一个带有大小限制的并发阻塞队列,实现的比较简单。
[cpp] view
plaincopy
#ifndef BLOCKINGQUEUE_H_
#define BLOCKINGQUEUE_H_
#include <queue>
#include <windows.h>
using namespace std;
template <typename T>
class BoundedBlockingQueue
{
public:
BoundedBlockingQueue(int size) : maxSize(size)
{
_lock = CreateMutex(NULL,false,NULL);
_rsem = CreateSemaphore(NULL,0,size,NULL);
_wsem = CreateSemaphore(NULL,size,size,NULL);
}
~BoundedBlockingQueue()
{
CloseHandle(_lock);
CloseHandle(_rsem);
CloseHandle(_wsem);
}
void push(const T& data);
T pop();
bool empty()
{
WaitForSingleObject(_lock,INFINITE);
bool is_empty = _array.empty();
ReleaseMutex(_lock);
return is_empty;
}
private:
deque<T> _array;
int maxSize;
HANDLE _lock;
HANDLE _rsem, _wsem;
};
template <typename T>
void BoundedBlockingQueue <T>::push(const T& value )
{
WaitForSingleObject(_wsem,INFINITE);
WaitForSingleObject(_lock,INFINITE);
_array.push_back(value);
ReleaseMutex(_lock);
ReleaseSemaphore(_rsem,1,NULL);
}
template <typename T>
T BoundedBlockingQueue<T>::pop()
{
WaitForSingleObject(_rsem,INFINITE);
WaitForSingleObject(_lock,INFINITE);
T _temp = _array.front();
_array.pop_front();
ReleaseMutex(_lock);
ReleaseSemaphore(_wsem,1,NULL);
return _temp;
}
#endif
主函数调用测试:一个生产者、两个消费者使用这个队列进行测试。
[cpp] view
plaincopy
#include "BlockingQueue.h"
#include <windows.h>
#include <iostream>
using namespace std;
bool is_over = false;
DWORD WINAPI produce(LPVOID lppara)
{
BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;
while(1)
{
for(int i=1; i<=50; ++i)
{
queue->push(i);
cout<<GetCurrentThreadId()<<" put a data: "<<i<<endl;
Sleep(10); //producer is fast
}
is_over = true;
break;
}
return NULL;
}
DWORD WINAPI consume(LPVOID lppara)
{
BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;
while(1)
{
int d = queue->pop();
cout<<GetCurrentThreadId()<<" get data: "<<d<<endl;
//double check
if(is_over && queue->empty())
{
cout<<"OVER!"<<endl;
break;
}
Sleep(10); //consumer is slow
}
return NULL;
}
int main()
{
DWORD write_data;
DWORD read_data;
DWORD read_data1;
BoundedBlockingQueue<int> queue(20);
//一个生产者、两个消费者
if(CreateThread(NULL,0,produce,&queue,0,&write_data)==NULL)
return -1;
if(CreateThread(NULL,0,consume,&queue,0,&read_data)==NULL)
return -1;
if(CreateThread(NULL,0,consume,&queue,0,&read_data1)==NULL)
return -1;
char ch;
while(1)
{
ch = getchar(); //press "e" to exit
if(ch == 'e') break;
}
printf("Program ends successfully\n");
return 0;
}
[cpp] view
plaincopy
#ifndef BLOCKINGQUEUE_H_
#define BLOCKINGQUEUE_H_
#include <queue>
#include <windows.h>
using namespace std;
template <typename T>
class BoundedBlockingQueue
{
public:
BoundedBlockingQueue(int size) : maxSize(size)
{
_lock = CreateMutex(NULL,false,NULL);
_rsem = CreateSemaphore(NULL,0,size,NULL);
_wsem = CreateSemaphore(NULL,size,size,NULL);
}
~BoundedBlockingQueue()
{
CloseHandle(_lock);
CloseHandle(_rsem);
CloseHandle(_wsem);
}
void push(const T& data);
T pop();
bool empty()
{
WaitForSingleObject(_lock,INFINITE);
bool is_empty = _array.empty();
ReleaseMutex(_lock);
return is_empty;
}
private:
deque<T> _array;
int maxSize;
HANDLE _lock;
HANDLE _rsem, _wsem;
};
template <typename T>
void BoundedBlockingQueue <T>::push(const T& value )
{
WaitForSingleObject(_wsem,INFINITE);
WaitForSingleObject(_lock,INFINITE);
_array.push_back(value);
ReleaseMutex(_lock);
ReleaseSemaphore(_rsem,1,NULL);
}
template <typename T>
T BoundedBlockingQueue<T>::pop()
{
WaitForSingleObject(_rsem,INFINITE);
WaitForSingleObject(_lock,INFINITE);
T _temp = _array.front();
_array.pop_front();
ReleaseMutex(_lock);
ReleaseSemaphore(_wsem,1,NULL);
return _temp;
}
#endif
主函数调用测试:一个生产者、两个消费者使用这个队列进行测试。
[cpp] view
plaincopy
#include "BlockingQueue.h"
#include <windows.h>
#include <iostream>
using namespace std;
bool is_over = false;
DWORD WINAPI produce(LPVOID lppara)
{
BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;
while(1)
{
for(int i=1; i<=50; ++i)
{
queue->push(i);
cout<<GetCurrentThreadId()<<" put a data: "<<i<<endl;
Sleep(10); //producer is fast
}
is_over = true;
break;
}
return NULL;
}
DWORD WINAPI consume(LPVOID lppara)
{
BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;
while(1)
{
int d = queue->pop();
cout<<GetCurrentThreadId()<<" get data: "<<d<<endl;
//double check
if(is_over && queue->empty())
{
cout<<"OVER!"<<endl;
break;
}
Sleep(10); //consumer is slow
}
return NULL;
}
int main()
{
DWORD write_data;
DWORD read_data;
DWORD read_data1;
BoundedBlockingQueue<int> queue(20);
//一个生产者、两个消费者
if(CreateThread(NULL,0,produce,&queue,0,&write_data)==NULL)
return -1;
if(CreateThread(NULL,0,consume,&queue,0,&read_data)==NULL)
return -1;
if(CreateThread(NULL,0,consume,&queue,0,&read_data1)==NULL)
return -1;
char ch;
while(1)
{
ch = getchar(); //press "e" to exit
if(ch == 'e') break;
}
printf("Program ends successfully\n");
return 0;
}
相关文章推荐
- Windows下一个并发阻塞队列(BlockingQueue)
- Windows下一个并发阻塞队列(BlockingQueue)
- (13)多线程与并发库之java5阻塞队列(BlockingQueue)的应用----子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着再回到主线程循环100次,如此循环50次
- JAVA并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- 并发队列 – 无界阻塞队列 LinkedBlockingQueue 原理探究
- (java多线程与并发)java并发库中的阻塞队列--BlockingQueue
- 并发队列ConcurrentLinkedQueue和阻塞栈LinkedBlockingQueue用法和阻塞队列ArrayBlockingQueue
- 深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- Java并发容器之阻塞队列BlockingQueue
- 并发工具包—阻塞队列BlockingQueue
- Java多线程与并发库高级应用之阻塞队列BlockingQueue
- java并发库中的阻塞队列--BlockingQueue
- Java 并发 --- 阻塞队列之LinkedBlockingQueue源码分析
- Java多线程/并发26、阻塞队列BlockingQueue
- java-并发集合-阻塞队列 LinkedBlockingQueue 演示
- (13)多线程与并发库之java5阻塞队列(BlockingQueue)的应用----子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着再回到主线程循环100次,如此循环50次
- 并发队列 – 无界阻塞队列 LinkedBlockingQueue 原理探究
- Java并发编程:阻塞队列BlockingQueue
- Java并发包:阻塞队列(BlockingQueue)