您的位置:首页 > 编程语言 > C语言/C++

基于C++11的线程池

2014-05-06 21:30 721 查看
转自/article/2879221.html

1.封装的线程对象

[cpp] view
plaincopyprint?





class task : public std::tr1::enable_shared_from_this<task>

{

public:

task():exit_(false){}

task( const task & ) = delete;

~task(){}

task & operator =( const task &) = delete;

void start();

void stop()

{

exit_ = true;

sync_.notify_one();

}

void set_job( const std::function<void()> & job, const std::string & file, int line)

{//提交任务

{

std::unique_lock<std::mutex> lock(mutex_);

job_ = job;

file_ = file;

line_ = line;

}

sync_.notify_one();//通知主线程有任务要执行....

}

void print_job(){

LOG(INFO)<<"sumbit from:"<<file_<<":"<<line_;

}

private:

bool exit_;

std::mutex mutex_;

std::condition_variable sync_;

std::function< void()> job_; //线程执行的任务,线程任意时刻,最多只能执行一个任务。

std::thread::id id_;

std::string file_;

int line_;

};<pre code_snippet_id="329966" snippet_file_name="blog_20140506_1_7374667" name="code" class="cpp">void task::start()

{

auto job_proxy = [this] (){

id_ = std::this_thread::get_id();

while( !exit_ )

{

std::unique_lock<std::mutex> lock(mutex_);

if( job_ )

{//有任务了,需要执行任务了

try

{

job_(); //执行任务的代码

}catch( std::exception & e)

{

}catch(...)

{

}

job_ = std::function<void()>(); //释放任务绑定的资源,主要为闭包捕获的资源,特别是shared_ptr对象.

tasks->job_completed( shared_from_this() ); //任务执行完成,通知线程池

}else{

//没有任务的时候,等待其他线程提交任务。。

sync_.wait(lock);

}

}

};

std::thread t(job_proxy); //创建并启动与task管理的线程

t.detach(); //分离模式,thread对象销毁了,但是其创建的线程还活着。。。

}

</pre>

<div><br>

</div>

<pre></pre>

<pre code_snippet_id="329966" snippet_file_name="blog_20140506_2_6023437" name="code" class="cpp"></pre>

2.线程池管理对象

[cpp] view
plaincopyprint?





class task_pool

{

public:

task_pool(unsigned int pool_size = 128):max_size_(pool_size),stop_all_(true)

{

}

~task_pool()

{

}

void job_completed( const std::tr1::shared_ptr<task> & t)//回收task对象

{

std::lock_guard<std::mutex> lock(mutex_);

bool need_to_notify = idle_tasks_.empty() && (!wait_for_running_jobs_.empty());

busying_tasks_.erase(t);

idle_tasks_.push_back(t);

LOG(INFO)<<"after job_completed, current idle tasks size:"<< idle_tasks_.size()

<<" busying tasks size:"<<busying_tasks_.size()

<<" wait for running jobs size:"<<wait_for_running_jobs_.size();

if( !busying_tasks_.empty() ){

(*busying_tasks_.begin())->print_job();

}

if( need_to_notify )//任务太多了,之前空闲线程使用完了,有任务在等待执行,需要通知

{

sync_.notify_one();

}

};

<span style="white-space:pre"> </span>//提交任务

void submit_job( const std::function<void()> & job, const std::string file, int line)

{

if( stop_all_ )

{

return;

}

std::lock_guard<std::mutex> lock(mutex_);

bool need_notify = wait_for_running_jobs_.empty();

wait_for_running_jobs_.push(std::make_tuple(job,file,line));

if( need_notify )//等待执行的任务为空时,需要通知,其他情况不需要通知.

{

sync_.notify_one();

}

}

void execute_job()

{

while(true)

{

std::unique_lock<std::mutex> lock(mutex_);

while(!stop_all_ && wait_for_running_jobs_.empty() )

{

//等待其他线程提交任务

sync_.wait(lock);

}

if( stop_all_ )

{

return;

}

while(!stop_all_ && idle_tasks_.empty())

{

//有任务要执行,但是没有空闲线程,等待其他任务执行完成。

sync_.wait(lock);

}

if( stop_all_ )

{

return;

}

//有任务,也有空闲线程了

auto t = get_task();

auto job =wait_for_running_jobs_.front();

wait_for_running_jobs_.pop();

<span style="white-space:pre"> </span>//分发任务到task 线程.

t->set_job(std::get<0>(job), std::get<1>(job), std::get<2>(job));

}

}

void stop_all()

{

std::lock_guard<std::mutex> lock(mutex_);

stop_all_ = true;

for( auto t : idle_tasks_ )

{

t->stop();

}

idle_tasks_.clear();

for( auto t : busying_tasks_ )

{

t->stop();

}

while(!wait_for_running_jobs_.empty()){

wait_for_running_jobs_.pop();

}

sync_.notify_one();

}

void start()

{// 初始化启动线程池主线程

try

{

std::thread t( [this]{ execute_job();});

t.detach();

stop_all_ = false;

allocate_tasks();

}catch( std::exception & e )

{

LOG(FATAL) << "start tasks pool ... error"<<e.what();

}

}

protected:

std::tr1::shared_ptr<task> get_task()

{

//获取task对象

if( ! idle_tasks_.empty() )

{

auto t = *idle_tasks_.begin();

idle_tasks_.pop_front(); //从空闲队列移除

busying_tasks_.insert(t); //加入忙队列

return t;

}

return std::tr1::shared_ptr<task>();

}

void allocate_tasks() //初始化线程池

{

for( int i = 0 ; i < max_size_; i ++ )

{

std::tr1::shared_ptr<task> t( new task());

try{

t->start();

idle_tasks_.push_back(t);

}catch( std::exception & e)

{ //超过进程最大线程数限制时,会跑出异常。。。

break;

}

}

}

private :

unsigned int max_size_;

std::list < std::tr1::shared_ptr<task> > idle_tasks_; //空闲任务队列

std::set < std::tr1::shared_ptr<task> > busying_tasks_;//正在执行任务的队列

std::queue< std::tuple< std::function<void()> , std::string, int > > wait_for_running_jobs_; //等待执行的任务

std::mutex mutex_;

std::condition_variable sync_;

bool stop_all_;

};

usage

[cpp] view
plaincopyprint?





static task_pool * tasks = nullptr;

static std::once_flag init_flag;

static std::once_flag finit_flag;

void run_job(const std::function<void()> & job , const std::string & file, int line )

{

if( tasks != nullptr)

tasks->submit_job(job, file,line);

}

void task_pool_init( unsigned max_task_size)

{

std::call_once(init_flag,[max_task_size]{

tasks = new task_pool(max_task_size);

tasks->start();

});

}

void task_pool_finit()

{

std::call_once(finit_flag,[]{ tasks->stop_all();});

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: