您的位置:首页 > 编程语言 > C语言/C++

C++ 实现线程安全的任务队列

2015-03-06 13:46 316 查看
C++ 实现线程安全的任务队列



flyfish 2015-3-6

一、三个接口函数说明

1 add 新增任务

2 get_nonblocking 非阻塞获取任务或者空任务

3 get_blocking 阻塞获取任务

头文件

#pragma once
#include <deque>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp>

//任务 网络发送任务使用的结构,通常有一个发送缓冲区和一个实际要发送的长度
class task
{
public:
unsigned char data[2048];
unsigned int len;//实际发送长度
task::task();
task::~task();
};

class task_queue
{
private:
std::deque<task> tasks;
boost::mutex tasks_mutex;
boost::condition_variable cv;
public:
task_queue::task_queue();
task_queue::~task_queue();

void add(const task& task);
std::tuple<bool,task> get_nonblock();
task get_block();
};


实现文件

#include "task_queue.h"

//task
task::task()
{
for (int i=0;i<2048;i++)
data[i] = 0;
}
task::~task()
{

}

// task queue
task_queue::task_queue()
{

}
task_queue::~task_queue()
{

}
void task_queue::add(const task& task)
{
boost::unique_lock<boost::mutex> lock(tasks_mutex);//不允许其他线程执行
tasks.push_back(task);
lock.unlock();
cv.notify_one();//通知其他线程继续
}
std::tuple<bool,task> task_queue::get_nonblock()
{
boost::lock_guard<boost::mutex> lock(tasks_mutex);
std::tuple<bool,task> ret;
if (!tasks.empty())
{
ret=std::make_tuple(true,tasks.front());
tasks.pop_front();
}
else
{
task tmp;
ret=std::make_tuple(false,tmp);
}
return ret;
}

task task_queue::get_block()
{
boost::unique_lock<boost::mutex> lock(tasks_mutex);
while (tasks.empty())
{
cv.wait(lock);
}
task ret=tasks.front();
tasks.pop_front();
return ret;
}


二 解释

1 notify_one用于唤醒一个等待该条件(condition)发生的线程

2 可以使用boost::mutex::scoped_lock 替换boost::unique_lock<boost::mutex>,可以避免遗漏unlock

例如

boost::mutex mutex_;
try
{
mutex_.lock();
//do something
mutex_.unlock();
}
catch(...)
{
mutex_.unlock();
return 0;
}


使用boost::mutex::scoped_lock就可以避免catch遗漏unlock

3 boost::unique_lock仅仅比boost::lock_guard附加一些功能

4 如果任务在获取之后不删除,就可以使用多读一写方式,就要实现读写锁

读操作发生时: 写线程停止操作,允许多个线程同时读操作

写操作发生时: 只允许同一时刻只有一个线程写操作,其他无论读写线程全部停止。

代码类似

typedef boost::shared_lock<boost::shared_mutex> r_lock;
typedef boost::unique_lock<boost::shared_mutex> w_lock;
boost::shared_mutex mutex_;

void read()
{
r_lock lock(mutex_);
//do something
}
void write()
{
w_lock lock(mutex_);
//do something
}


模板方式

<pre name="code" class="cpp">#pragma once
#include <tuple>
#include <deque>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp>

template <typename T>
class task_queue
{
private:
std::deque<T> tasks;
boost::mutex tasks_mutex;
boost::condition_variable cv;
public:

void add(const  T&  t)
{
boost::unique_lock<boost::mutex> lock(tasks_mutex);

tasks.push_back(t);
lock.unlock();
cv.notify_one();
}
std::tuple<bool, T> get_nonblock()
{
boost::lock_guard<boost::mutex> lock(tasks_mutex);
std::tuple<bool,T> ret;
if (!tasks.empty())
{
ret=std::make_tuple(true,tasks.front());
tasks.pop_front();
}
else
{
T tmp;
ret=std::make_tuple(false,tmp);
}
return ret;
}
T get_block()
{
boost::unique_lock<boost::mutex> lock(tasks_mutex);
while (tasks.empty())
{
cv.wait(lock);
}
T ret=tasks.front();
tasks.pop_front();
return ret;
}

};




以上程序在VC2010 Boost库下编译通过
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: