【网络编程】半同步--半异步线程池源码分析之任务队列(基于C++11)
2017-07-27 09:01
686 查看
前言
对于C++的学习,感觉如果只看书,学习效率很低。很多新知识新概念理解起来都很困难,而C++11更是引入了更多新的概念和知识。而在学习服务端这部分,什么“同步–异步”也把人搞得很晕如果看不懂书,看不懂概念,不如就找个例子分析一下,起码能让你快速上手。
想起了老陈给我们上《操作系统》时讲的要把理论和实例化的东西相结合,对于我这种笨笨的娃尤其适宜,只有理论实在看不懂ToT。
本文主要通过一个实例(半同步–半异步线程池)的源码分析,体会服务端中的并发模式,同时,由于代码几乎完全使用C++11开发,也会针对C++11的新概念进行讲解。
所以本文适合于希望学习C++11/服务端开发的小伙伴。
这个例子出自《深入应用C++11—代码优化与工程级应用》的第9章。源代码已经开源在作者的github上。 原代码地址
正文
同步队列模板类SyncQueue
让我们先看一下私有成员变量:private: std::list<T> m_queue; //使用std::list来保存任务 std::mutex m_mutex; //C++11的互斥锁 用于线程同步 std::condition_variable m_notEmpty; //C++11的条件变量,用于保证线程执行的次序 std::condition_variable m_notFull; //同上 int m_maxsize; //队列中允许保存的最大任务数 bool m_needStop; //用于终止队列的标志,默认为false };
构造函数
//构造函数:这里实现的较为简单,主要是记录了队列允许的最大任务数,同时将终止队列的标志m_needStop设置为false SyncQueue(int maxsize) : m_maxsize(maxsize), m_needStop(false) {}
让我们先来看最简单的3个public方法,主要使用互斥量std::mutex 来保证对队列访问的互斥
//这三个public方法涉及到队列的计数(size方法),因此在调用时要加锁 //这里使用std::lock_guard()来更加简便 bool Empty() //判断是否为空 { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.empty(); } bool Full() //判断是否为满 { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size() == m_maxsize; } size_t Size() //得到当前的任务数量 { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size(); }
std::mutex的一般用法如下:
#include <mutex> std::mutex m; void fun(){ m.lock(); //临界区 m.unlock(); }
这样直接加锁解锁很直观,但是需要我们手动unlock,有时候忘了unlock就尴尬了。所以我们使用简单的std::lock_guard locker(m_mutex) 这个locker在构造时加锁,在析构时解锁,所以我们可以保证这三个函数互斥访问临界区。
两个主要的操作Take & Put
其实Take和Put的流程基本相同,主要使用了条件变量condition_variable来控制线程的执行顺序,保证队列空时Put线程运行,队列满时Take线程运行。条件变量的使用需要配合锁,并且必须是std::unique_lock
//从任务队列中获取任务,重载的Take主要为了实现批量取任务 //如果参数为std::list<T> 则直接将所有的任务都取出(放到这个list里) //否则只取出一个 void Take(std::list<T>& list) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); }); // lambda表达式捕获了this指针则可以直接调用private 方法 //如果队列不为空则往下执行,否则阻塞 //如果m_needStop标志为true 则直接结束 if(m_needStop) return; list = std::move(m_queue);//通过move移动语义,减少一次复制 m_notFull.notify_one(); //唤醒一个等待Put的线程 } //重载的Take方法执行流程一致,只是取出队列中最前面的任务 void Take(T& x) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty();}); if(m_needStop) return ; x = m_queue.front(); m_queue.pop_front(); //取出任务 m_notFull.notify_one(); }
条件变量的使用方式为
std::mutex m_mutex; std::condition_varible m_cv; void func(){ std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this]{return isTrue();}); //临界区 m_cv.notify_one(); }
即首先线程竞争获得锁locker,注意必须使用std::unique_lock
然后调用wait方法.这里使用了wait方法的重载版本,首先判断谓词是否为true,若是直接继续.否则,线程阻塞,同时wait会将locker解锁(让其它线程能拿到锁),等待别的线程通过m_cv的notify_*方法将其唤醒.
一旦唤醒,则继续拿锁判断谓词.
wait还有一种版本,不需谓词,直接阻塞解锁,等待notify_*方法唤醒,唤醒后拿锁继续执行.
可以看到wait方法有自动解锁拿锁的过程,因此必须使用std::unique_lock .
//向任务队列中添加任务,使用私有方法Add //这里Put是采用Add实现的,之所以要这样做就是为了覆盖三种情况(常量左值,非常量左值,右值) void Put(const T &x)// 参数为常量左值 { Add(x); //直接传递 } void Put(T &&x) //参数为左值或者右值 { Add(std::forward<T>(x)); //使用完美转发,保存参数的类型 } private: template <typename F> void Add(F &&x) { //流程同理,通过条件变量保证访问顺序 std::unique_lock<std::mutex> locker(m_mutex); m_notFull.wait(locker, [this]{return m_needStop || NotFull();}); if(m_needStop) return; m_queue.push_back(std::forward<F>(x));//再次完美转发放入队列 m_notEmpty.notify_one(); }
终止操作
//停止所有任务 void Stop() { //这里依然是要对队列操作,所以我们依然要加锁 { std::lock_guard<std::mutex> locker(m_mutex); m_needStop = true;//设置标志 } // 这里线程放下了锁(因为一个代码块结束,locker就已经析构,放下了锁) //这个代码块是为了下面唤醒线程之后,线程能立刻拿到锁退出. m_notFull.notify_all(); //唤醒正在等待Take和Put的线程,然后所有线程依此拿锁return m_notEmpty.notify_all(); }
判断队列是否为空/满
private: //这两个方法由于调用时线程已经拿到锁,所以不用再在计数前加锁了 bool NotFull() const //指定对象为const 防止修改 { bool full = m_queue.size() >= m_maxsize; if(full) std::cout << "buffer is full ...please wait" << std::endl; else return !full; } bool NotEmpty() const { bool empty = m_queue.empty(); if(empty){ std::cout << "buffer is empty... please wait. Thread ID:" << std::this_thread::get_id() << std::endl; } return !empty; }
相关文章推荐
- 【网络编程】半同步--半异步线程池源码分析之线程池(基于C++11)
- OKHttp网络框架源码解析(一)okHttp框架同步异步请求流程和源码分析
- 变态的libDispatch源码分析-全局队列异步延时任务处理过程-计时轮询
- iOS网络编程之同步、异步、请求队列
- memcached源码分析-----半同步半异步网络模型
- Linux网络编程【四】:进程池、线程池原理及简单线程池源码分析
- [C++11]半同步半异步线程池实现与分析
- ios--进程/多线程/同步任务/异步任务/串行队列/并行队列(对比分析)
- GCD编程dispatch_sync(同步)和dispatch_async(异步)方式执行并发队列任务区别
- iOS -- 进程/多线程/同步任务/异步任务/串行队列/并行队列(对比分析)
- 变态的libDispatch源码分析-全局队列异步延时任务处理过程-设置计时器与插入ds
- iOS网络编程之同步、异步、请求队列
- ios 网络编程之同步,异步,请求队列
- 变态的libDispatch源码分析-全局队列异步延时任务处理过程-原理与创建ds
- iOS网络编程之同步、异步、请求队列
- 基于Sockets的编程中多任务同步的处理机制
- 网络编程释疑之:同步,异步,阻塞,非阻塞
- GCD使用 串行并行队列 与 同步异步执行的各种组合 及要点分析
- android网络编程 -- Socket 通信(03) 点对点Android聊天室实现(带服务器) [附源码分析]
- glib库异步队列和线程池代码分析