您的位置:首页 > 编程语言 > C语言/C++

C++阻塞队列实现

2016-02-26 13:20 507 查看
阻塞队列是后台开发中多线程异步架构的基本数据结构,像python, java 都提供线程安全的阻塞队列,c++ 可能需要自己实现一个模板。

从性能考虑,自己没有使用STL的queue作为基本数据结构,而是使用循环数组作为基本数据结构,性能应该比queue高,省去了动态内存分配和回收。

确点就是,队列大小不可动态扩展,当时实际开发中,可以通过压力测试和内存的限制,配置合适的队列大小来满足应用需求。

程序代码

#ifndef BLOCK_QUEUE_H
#define BLOCK_QUEUE_H

#include <iostream>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
using namespace std;

template<class T>
class block_queue
{
public:
block_queue(int max_size = 1000)
{
if(max_size <= 0)
{
exit(-1);
}

m_max_size = max_size;
m_array = new T[max_size];
m_size = 0;
m_front = -1;
m_back = -1;

m_mutex = new pthread_mutex_t;
m_cond = new pthread_cond_t;
pthread_mutex_init(m_mutex, NULL);
pthread_cond_init(m_cond, NULL);
}

void clear()
{
pthread_mutex_lock(m_mutex);
m_size = 0;
m_front = -1;
m_back = -1;
pthread_mutex_unlock(m_mutex);
}

~block_queue()
{
pthread_mutex_lock(m_mutex);
if(m_array != NULL)
delete  m_array;
pthread_mutex_unlock(m_mutex);

pthread_mutex_destroy(m_mutex);
pthread_cond_destroy(m_cond);

delete m_mutex;
delete m_cond;
}

bool full()const
{
pthread_mutex_lock(m_mutex);
if(m_size >= m_max_size)
{
pthread_mutex_unlock(m_mutex);
return true;
}
pthread_mutex_unlock(m_mutex);
return false;
}

bool empty()const
{
pthread_mutex_lock(m_mutex);
if(0 == m_size)
{
pthread_mutex_unlock(m_mutex);
return true;
}
pthread_mutex_unlock(m_mutex);
return false;
}

bool front(T& value)const
{
pthread_mutex_lock(m_mutex);
if(0 == m_size)
{
pthread_mutex_unlock(m_mutex);
return false;
}
value = m_array[m_front];
pthread_mutex_unlock(m_mutex);
return true;
}

bool back(T& value)const
{
pthread_mutex_lock(m_mutex);
if(0 == m_size)
{
pthread_mutex_unlock(m_mutex);
return false;
}
value = m_array[m_back];
pthread_mutex_unlock(m_mutex);
return true;
}

int size()const
{
int tmp = 0;
pthread_mutex_lock(m_mutex);
tmp = m_size;
pthread_mutex_unlock(m_mutex);
return tmp;
}

int max_size()const
{
int tmp = 0;
pthread_mutex_lock(m_mutex);
tmp = m_max_size;
pthread_mutex_unlock(m_mutex);
return tmp;
}

bool push(const T& item)
{
pthread_mutex_lock(m_mutex);
if(m_size >= m_max_size)
{
pthread_cond_broadcast(m_cond);
pthread_mutex_unlock(m_mutex);
return false;
}

m_back = (m_back + 1) % m_max_size;
m_array[m_back] = item;

m_size++;
pthread_cond_broadcast(m_cond);
pthread_mutex_unlock(m_mutex);

return true;
}

bool pop(T& item)
{
pthread_mutex_lock(m_mutex);
while(m_size <= 0)
{
if(0 != pthread_cond_wait(m_cond, m_mutex))
{
pthread_mutex_unlock(m_mutex);
return false;
}
}

m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];
m_size--;
pthread_mutex_unlock(m_mutex);
return true;
}

bool pop(T& item, int ms_timeout)
{
struct timespec t = {0,0};
struct timeval now = {0,0};
gettimeofday(&now, NULL);
pthread_mutex_lock(m_mutex);
if(m_size <= 0)
{
t.tv_sec = now.tv_sec + ms_timeout/1000;
t.tv_nsec = (ms_timeout % 1000)*1000;
if(0 != pthread_cond_timedwait(m_cond, m_mutex, &t))
{
pthread_mutex_unlock(m_mutex);
return false;
}
}

if(m_size <= 0)
{
pthread_mutex_unlock(m_mutex);
return false;
}

m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];m_size--;
pthread_mutex_unlock(m_mutex);
return true;
}

private:
pthread_mutex_t *m_mutex;
pthread_cond_t *m_cond;
T *m_array;
int m_size;
int m_max_size;
int m_front;
int m_back;
};

#endif


测试程序

#include<iostream>
#include"block_queue.h"
using namespace std;

block_queue<int> g_queue(100);
void *p(void *args)
{
sleep(1);
int data = 0;
for(int i = 0; i < 100; i++)
{
g_queue.push(data++);
}
return NULL;
}

void *c(void* args)
{
while(true)
{
int t = 0;
if(!g_queue.pop(t,1000))
{
cout<<"timeout"<<endl;
continue;
}
else
{
cout<<t<<endl;
}
g_queue.pop(t);
cout<<"block="<<t<<endl;

}

return NULL;
}

int main()
{
pthread_t id;
pthread_create(&id, NULL, p, NULL);
//pthread_create(&id, NULL, p, NULL);
//pthread_create(&id, NULL, c, NULL);
pthread_create(&id, NULL, c, NULL);
for(;;)sleep(1);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  c++ 阻塞队列 多线程