您的位置:首页 > 编程语言 > C语言/C++

C++11学习笔记-----互斥量以及条件变量的使用

2018-02-07 19:31 246 查看
在多线程环境中,当多个线程同时访问共享资源时,由于操作系统CPU调度的缘故,经常会出现一个线程执行到一半突然切换到另一个线程的情况。以多个线程同时对一个共享变量做加法运算为例,自增的汇编指令大致如下,先将变量值存放在某个寄存器中(eax),然后对寄存器进行加一,随后将结果回写到变量内存上

mov [#address#] eax;    // 这里#address#简要表示目标变量的地址   // 1
inc eax;    // 2
mov eax [#address#];    // 3


假设存在两个线程同时对变量a进行加法操作,a初值为0,如果其中一个线程在第一步执行完后被切走,那么最终a的结果可能不是2而是1



由图片可知,由于cpu调度的缘故,多线程下同时对共享变量进行操作,可能会导致最终的结果并不是期望值。所以,为了保护共享变量,保证同一时刻只能允许一个线程对共享变量进行操作,就需要借助互斥量的协助

Linux下的原生互斥量

Linux下提供了原生互斥量api,定义在头文件<pthread.t>中。互斥量,形象点理解就是一把锁,在对共享变量进行操作之前,先上锁,只有获得锁的这个线程能够继续运行,而其他线程运行到上锁语句时,会阻塞在那里直到获得锁的那个线程执行解锁操作,随后继续争抢锁,抢到锁的线程接着执行,没有抢到锁的线程继续阻塞

示例:利用互斥锁解决多线程共享变量问题

在上一篇中提到了创建10个线程同时对一个共享变量进行自增,发现结果和预期不同,接下来利用互斥量解决这一问题

#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>

#include <iostream>
#include <vector>

long long int total = 0;
pthread_mutex_t m;

void* thread_task(void* arg)
{
for(int i = 0; i < 10000; ++i)
{
/* 对total进行加法之前先上锁,保证同一时刻只能有一个线程执行++total */
::pthread_mutex_lock(&m);
++total;
/* 解锁 */
::pthread_mutex_unlock(&m);
}
::pthread_exit(nullptr);
}

int main()
{
/* 初始化互斥量 */
::pthread_mutex_init(&m, nullptr);
std::vector<pthread_t> tids;
for(int i = 0; i < 10; ++i)
{
pthread_t tid;
::pthread_create(&tid, nullptr, thread_task, nullptr);
tids.emplace_back(tid);
}
for(auto& tid : tids)
::pthread_join(tid, nullptr);

/* 释放互斥量 */
::pthread_mutex_destroy(&m);

std::cout << total << std::endl;
return 0;
}


C++11下的互斥量和条件变量

互斥量

对比linux原生的库函数,C++11提供的互斥量突出的特点有

无需考虑互斥量的初始化和销毁,在类的构造和析构函数中管理,无需使用者操心

采用RAII对互斥量进行了不同封装,提供了更加友好的上锁机制

C++11提供的互斥量位于<mutex>头文件中,提供的接口有

lock,上锁

try_lock,尝试上锁,如果失败则返回false

unlock,解锁

这三个函数和linux下的接口差不多,其实也没什么不同嘛~。事实上,多数程序都不直接使用std::mutex,标准库采用RAII(资源获取时就进行初始化)对std::mutex进行了封装,使用起来当然是方便得不得了

简单的锁机制lock_guard

最简单的封装是std::lock_guard,单纯利用RAII,构造时上锁,析构时解锁,使用示例为

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

int main()
{
long long int total = 0;
std::mutex m;
std::vector<std::thread> threads;
for(int i = 0; i < 10; ++i)
{
threads.emplace_back(
[&m, &total]
{
for(int i = 0; i < 10000; ++i)
{
{
std::lock_guard<std::mutex> lock(m);
++total;
}
}
}
);
}
for(auto& th : threads)
th.join();

std::cout << total << std::endl;
return 0;
}


想对于共享数据的提供保护,使用std::lock_guard是完全没有问题的,进入共享区前上锁,离开后解锁

更灵活的锁unique_lock

稍微复杂的封装是std::unique_lock,它提供了更灵活的上锁机制,即通过构造函数的参数进行设置,分别可以

直接上锁

延迟上锁,仅保存互斥量,不进行上锁工作

尝试上锁

但是多数情况下采用默认的直接上锁就可以了,而在std::unique_lock的生存期间,使用者也可以对其进行解锁再上锁等工作,这个作用体现在和条件变量的配合上

条件变量

标准库中条件变量位于头文件<condition_variable>中

其中有三个接口用于阻塞当前线程,常用的是wait

void wait(std::unique_lock<std::mutex>& lock);
template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);


原子操作释放锁lock,阻塞当前线程,并将当前线程添加到*this上的等待线程列表,等待notify_one或者notify_all调用时结束阻塞(第二个重载当pred返回true时也会结束阻塞)

此外,还有两个接口用于通知一个或多个等待线程,将其从阻塞状态变为非阻塞

void notify_one() noexcept;
void notify_all() noexcept;


示例,利用互斥量和条件变量实现线程池

线程池工作原理

线程池的工作原理是预先创建若干线程,同时维护一个任务队列,每个线程不断地从任务队列中取出任务并执行,使用者可以随时向任务队列中添加新任务。当任务队列为空,线程池中的线程要么执行自己那个没有结束的任务,要么处于睡眠状态

在这个问题模型中,任务队列就相当于共享变量,同一时刻只能有一个线程访问任务队列并从中取出任务,而添加任务时也需要避免添加和取出同时进行,这就需要互斥量的协助,凡是涉及到对任务队列的存和取,都需要事先上锁。

另外,如果任务队列为空,那么每个线程都不断的上锁,取任务(发现为空),解锁,再上锁,取任务(发现为空),解锁…这样的busy loop会极大消耗cpu,造成了不必要的开销,所以需要引入条件变量,当任务队列为空时,采用条件变量令线程睡眠

线程池定义

可以明确的是,线程池除了构造析构函数外,需要提供一个接口用于调用者添加任务,所以线程池的定义可以明确如下

#include <future>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>

class ThreadPool
{
public:
ThreadPool(std::size_t threadNums);
~ThreadPool();

void stop() { quit_ = true; }
public:
/* 用于添加任务,std::future<>用于保存函数f的执行结果 */
template <class F, class... Args>
auto enqueue(F&& f, Args... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::atomic<bool> quit_;
std::mutex mutex_;
std::condition_variable cond_;
};


构造函数

当线程池构造时,创建threadNums个线程,每个线程都从任务队列中取出任务然后执行

ThreadPool::ThreadPool(std::size_t threadNums)
: quit_(false)
{
for(std::size_t i = 0; i < threadNums; ++i)
{
threads_.emplace_back(
[this]
{
while(!this->quit_)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->mutex_);
/* 利用条件变量,等待直到线程池退出或者任务队列不为空 */
cond_.wait(lock, [this]() { return this->quit_ || !this->tasks_.empty(); });
if(this->quit_) return;
task = this->tasks_.front();
this->tasks_.pop();
}
task();
}
}
);
}
}


析构函数

析构函数用于回收线程资源

ThreadPool::~ThreadPool()
{
stop();
cond_.notify_all();
for(auto& th : threads_)
th.join();
}


添加任务

enqueue函数用于添加任务,涉及到了一些std::future的内容,这里先简单看看

/* class... 表示不定长参数列表 */
template <class F, class... Args>
/*
* auto会根据->后的内容自动推导返回类型
* std::future用于保存函数运行结果
* std::result_of用于获取函数运行结果
* std::packaged_task<T>是一个函数包,类似std::function,用于包装函数
* std::packaged_task<T>::get_future用于返回函数运行结果
* std::unique_lock<std::mutex> 上锁(这里也可以用std::lock_guard
*/
auto ThreadPool::enqueue(F&& f, Args... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
std::unique_lock<std::mutex> lock(mutex_);
tasks_.push([task]() { (*task)(); });
return res;
}


测试代码

int main()
{
ThreadPool pool(4);
std::vector<std::future<int>> results;
for(int i = 0; i < 10; ++i)
{
results.emplace_back(
pool.enqueue(
[i]
{
/* std::this_thread::sleep_for(std::chrono::seconds(1)); */
return i * i;
}
)
);
}
for(auto&& result : results)
std::cout << result.get() << std::endl;
return 0;
}


小结

互斥锁是多线程环境中不可缺少的重要部分,用于保护共享资源免受cpu调度的危害。另外,条件变量和互斥锁配合使用可以避免busy loop带来的不必要损耗
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐