您的位置:首页 > 运维架构 > Linux

《Linux多线程服务端编程》—线程同步精要

2016-09-06 14:16 411 查看
并发编程的两种基本模型:message passing 和 shared memory。

使用message passing 可以跨机器,分布式系统的架构更具有一致性,扩容起来也较容易。

线程同步的四项原则

按重要性排序:

首要原则是尽量最低限度地共享对象,减少需要同步的场合。一个对象能不暴露给别的线程就不要暴露;如果要暴露,优先考虑immutable对象;实在不行才暴露可修改的对象,并用同步措施来充分保护它。

其次是使用高级的并发编程构件,如TaskQueue、Producer-Consumer Queue、CountDownLatch等等。

最后不得已必须使用底层同步原语(primitives)时,只用非递归的互斥器和条件变量,慎用读写锁,不要用信号量。

除了使用atomic整数之外,不自己编写lock-free代码,也不要用“内核级”同步原语。不凭空猜测“哪种做法性能会更好”,比如spin lock vs. mutex。

下面着重讲第3条:底层同步原语的使用。

互斥器

互斥器(mutex)保护了临界区,任何一个时刻最多只能有一个线程在此mutex划出的临界区内活动。单独使用mutex时,主要为了保护共享数据。

一些原则:

用RAII手法封装mutex的创建、销毁、加锁、解锁这四个操作。

只用非递归的mutex(即不可重入的mutex)。

不手工调用lock()和unlock()函数,一切交给栈上的Guard对象的构造和析构函数负责。Guard对象的生命期正好等于临界区。这样保证了始终在同一个函数同一个scope内对某个mutex加锁和解锁。避免在foo()内加锁,然后跑到bar()内解锁,也避免在不同的语句分支中分别加锁、解锁。这种做法称为“Scoped Locking”。

在每次构造Guard对象的时候,思考一路上(调用栈)已经持有的锁,防止因加锁顺序不同而导致死锁。由于Guard对象是栈上对象,看函数调用栈就能分析用锁的情况,非常便利。

注:所谓“重入”,常见的情况是,程序执行到某个函数foo()时,收到信号,于是暂停目前正在执行的函数,转到信号处理函数,而这个信号处理函数的执行过程中,又恰恰也会进入到刚刚执行的函数foo(),这样便发生了所谓的重入。此时如果foo()能够正确的运行,而且处理完成后,之前暂停的foo()也能够正确运行,则说明它是可重入的。

次要原则有:

不使用跨进程的mutex,进程间通信只用TCP sockets。

加锁、解锁在同一个线程,线程a不能去unlock线程b已经锁住的mutex(RAII自动保证)。

别忘了解锁(RAII自动保证)。

不重复解锁(RAII自动保证)。

必要的时候可以考虑用PTHREAD_MUTEX_ERRORCHECK来排错。

只使用非递归的mutex

mutex分为递归(可重入,reentrant)和非递归(不可重入),它们的唯一区别:同一线程可以重复对递归mutex加锁,但是不能重复对非递归mutex加锁。

递归mutex不用考虑一个线程会把自己锁死,但是却隐藏了一些问题,典型情况是你以为拿到一个锁就能修改对象了,但是可能外层代码也已经拿到了锁,正在修改或读取同一个对象。这时将会造成意向不到的后果。

而如果使用非递归mutex,则程序将会死锁——把程序的逻辑错误尽早暴露出来,而且死锁更容易debug。

条件变量(condition variable)

互斥器是加锁原语,用来排他性地访问共享数据,它不是等待原语。

如果需要等待某个条件成立,我们应该使用条件变量——一个或多个线程等待某个布尔表达式为真,即等待别的线程“唤醒”它,其学名叫做“管程(monitor)”。

对条件变量的使用包括两个动作:

1. 线程等待某个条件, 条件为真则继续执行,条件为假则将自己挂起(避免busy wait,节省CPU资源);

2. 线程执行某些处理之后,条件成立;则通知等待该条件的线程继续执行。

对于wait端:

1. 必须与mutex一起使用(防止race-condition),该布尔表达式的读写需受此mutex保护。

2. 在mutex已上锁的时候才能调用wait()。

3. 把判断布尔条件和wait()放到while循环中。

代码:

muduo::MutexLock mutex;
muduo::Condition cond(mutex);
std::deque<int> queue;

int dequeue()
{
MutexLockGuard lock(mutex);
while(queue.empty()) //queue.empty()为布尔表达式,必须用循环;必须在判断之后再wait()
{
cond.wait(); //原子地unlock mutex并进入等待,不会与enqueue死锁
// wait()执行完毕时会自动重新加锁
}
assert(!queue.empty());
int top = queue.front();
queue.pop_front();
return top;
}


为什么是while循环来等待条件变量而不是if语句来判断:

这是因为可能会存在虚假唤醒(spurious wakeup)的情况。

也就是说,即使没有线程调用condition_signal, 原先调用condition_wait的函数也可能会返回。此时线程被唤醒了,但是条件并不满足,这个时候如果不对条件进行检查而往下执行,就可能会导致后续的处理出现错误。

虚假唤醒在linux的多处理器系统中,在程序接收到信号时可能会发生。在Windows系统和JAVA虚拟机上也存在。在系统设计时应该可以避免虚假唤醒,但是这会影响条件变量的执行效率,而既然通过while循环就能避免虚假唤醒造成的错误,因此程序的逻辑就变成了while循环的情况。

对于signal/broadcast端:

1. 不一定要在mutex已上锁的情况下调用signal(理论上)。

2. 在signal之前一般要修改布尔表达式。

3. 修改布尔表达式通常要用mutex保护。

4. 注意区分signal与broadcast:broadcast通常用于表明状态变化,signal通常用于表明资源可用。

代码:

ivoid enqueue(int x)
{
MutexLockGuard lock(mutex);
queue.push_back(x);
cond.notify(); //可以移出临界区之外
}


CountDownLatch(倒计时)是一种常用且易用的同步手段,其用途有二:

1. 主线程发起多个子线程,等这些子线程各自都完成一定的任务之后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化。

2. 主线程发起多个子线程,子线程等待主线程,主线程完成其他一些任务之后,通知所有子线程开始执行。通常用于多个子线程等待主线程发出“起跑”命令。

class CountDownLatch : boost::noncopyable
{
public:
explicit CountDownLatch(int count); //倒数几次
void wait(); //等待计数值变为0
void countDown(); //计数减1

private:
mutable MutexLock mutex_; //顺序很重要,先mutex后condition
Condition condition_;
int count_;
};

CountDownLatch::CountDownLatch(int count)
: mutex_(),
condition_(mutex_),
count_(count)
{ }

void CountDownLatch::wait()
{
MutexLockGuard lock(mutex_);
while (count_)
{
condition_.wait();
}
}

void CountDownLatch::countDown()
{
MutexLockGuard lock(mutex_);
--count_;
if (count_ == 0)
{
condition_notifyAll();
}
}


封装MutexLock、MutexLockGuard、Condition

// Use as data member of a class, eg.
//
// class Foo
// {
//  public:
//   int size() const;
//
//  private:
//   mutable MutexLock mutex_;
//   std::vector<int> data_; // GUARDED BY mutex_
// };
class MutexLock : boost::noncopyable
{
public:
MutexLock()
: holder_(0)
{
MCHECK(pthread_mutex_init(&mutex_, NULL));
}

~MutexLock()
{
assert(holder_ == 0);
MCHECK(pthread_mutex_destroy(&mutex_));
}

// must be called when locked, i.e. for assertion
bool isLockedByThisThread() const
{
return holder_ == CurrentThread::tid();
}

void assertLocked() const
{
assert(isLockedByThisThread());
}

// internal usage

void lock() //仅供MuetexLockGuard调用,严禁用户代码调用
{
MCHECK(pthread_mutex_lock(&mutex_)); //两行顺序不能反
assignHolder();
}

void unlock() //仅供MuetexLockGuard调用,严禁用户代码调用
{
unassignHolder(); //两行顺序不能反
MCHECK(pthread_mutex_unlock(&mutex_));
}

pthread_mutex_t* getPthreadMutex() /* non-const */
{
return &mutex_;
}

private:
friend class Condition;

class UnassignGuard : boost::noncopyable
{
public:
UnassignGuard(MutexLock& owner)
: owner_(owner)
{
owner_.unassignHolder();
}

~UnassignGuard()
{
owner_.assignHolder();
}

private:
MutexLock& owner_;
};

void unassignHolder()
{
holder_ = 0;
}

void assignHolder()
{
holder_ = CurrentThread::tid();
}

pthread_mutex_t mutex_;
pid_t holder_;
};

class MutexLockGuard : boost::noncopyable
{
public:
explicit MutexLockGuard(MutexLock& mutex)
: mutex_(mutex)
{
mutex_.lock();
}

~MutexLockGuard()
{
mutex_.unlock();
}

private:

MutexLock& mutex_;
};

// Prevent misuse like:
// MutexLockGuard(mutex_);
// 以上将产生一个临时对象又马上销毁了
// 正确写法:MutexLockGuard lock(mutex_);
#define MutexLockGuard(x) error "Missing guard object name"


下面这个muduo::Condition class简单封装了Pthreads condition variable(boost、C++的线程库中,同步原语过于庞杂。如果你不需要太高的灵活性,可以自己封装几个简简单单一看就明白的class来用——提供灵活性固然是本身,然而在不需要灵活性的地方把代码写死,更需要大智慧)。

class Condition : boost::noncopyable
{
public:
explicit Condition(MutexLock& mutex)
: mutex_(mutex)
{
MCHECK(pthread_cond_init(&pcond_, NULL));
}

~Condition()
{
MCHECK(pthread_cond_destroy(&pcond_));
}

void wait()
{
MutexLock::UnassignGuard ug(mutex_);
MCHECK(pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()));
}

// returns true if time out, false otherwise.
bool waitForSeconds(double seconds);

void notify()
{
MCHECK(pthread_cond_signal(&pcond_));
}

void notifyAll()
{
MCHECK(pthread_cond_broadcast(&pcond_));
}

private:
MutexLock& mutex_;
pthread_cond_t pcond_;
};

// returns true if time out, false otherwise.
bool muduo::Condition::waitForSeconds(double seconds)
{
struct timespec abstime;
// FIXME: use CLOCK_MONOTONIC or CLOCK_MONOTONIC_RAW to prevent time rewind.
clock_gettime(CLOCK_REALTIME, &abstime);

const int64_t kNanoSecondsPerSecond = 1e9;
int64_t nanoseconds = static_cast<int64_t>(seconds * kNanoSecondsPerSecond);

abstime.tv_sec += static_cast<time_t>((abstime.tv_nsec + nanoseconds) / kNanoSecondsPerSecond);
abstime.tv_nsec = static_cast<long>((abstime.tv_nsec + nanoseconds) % kNanoSecondsPerSecond);

MutexLock::UnassignGuard ug(mutex_);
return ETIMEDOUT == pthread_cond_timedwait(&pcond_, mutex_.getPthreadMutex(), &abstime);
}


mutex和condition都是非常底层的同步原语,主要用来实现更高级的并发编程工具,并不鼓励到处使用。

线程安全的Singleton实现

使用pthread_once:

#include <boost/noncopyable.hpp>
#include <assert.h>
#include <stdlib.h> // atexit
#include <pthread.h>

namespace muduo
{

namespace detail
{
// This doesn't detect inherited member functions!
// http://stackoverflow.com/questions/1966362/sfinae-to-check-for-inherited-member-functions template<typename T>
struct has_no_destroy
{
template <typename C> static char test(typeof(&C::no_destroy)); // or decltype in C++11
template <typename C> static int32_t test(...);
const static bool value = sizeof(test<T>(0)) == 1;
};
}

template<typename T>
class Singleton : boost::noncopyable
{
public:
static T& instance()
{
pthread_once(&ponce_, &Singleton::init);
assert(value_ != NULL);
return *value_;
}

private:
Singleton();
~Singleton();

static void init()
{
value_ = new T();
if (!detail::has_no_destroy<T>::value)
{
::atexit(destroy); //注册销毁函数
}
}

static void destroy()
{
typedef char T_must_be_complete_type[sizeof(T) == 0 ? -1 : 1];
T_must_be_complete_type dummy; (void) dummy;

delete value_;
value_ = NULL;
}

private:
static pthread_once_t ponce_;
static T*             value_;
};

//静态变量初始化
template<typename T>
pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;

template<typename T>
T* Singleton<T>::value_ = NULL;

}


它用pthread_once_t来保证lazy-initialization的线程安全,线程安全性由Pthreads库保证。另外,我们通过atexit(3)来提供销毁功能。

使用方法:

Foo& foo = Singleton<Foo>::instance();


关于noncopyable

boost::noncopyable 用于防止复制,如果是自己实现,需要把构造函数、拷贝构造,复制构造都私有。

关于pthread_once

有时候我们需要对一些posix变量只进行一次初始化。如果我们进行多次初始化程序就会出现错误。

在传统的顺序编程中,一次性初始化经常通过使用布尔变量来管理。控制变量被静态初始化为0,而任何依赖于初始化的代码都能测试该变量。如果变量值仍然为0,则它能实行初始化,然后将变量置为1。以后检查的代码将跳过初始化。

但是在多线程程序设计中,事情就变的复杂的多。如果多个线程并发地执行初始化序列代码,可能有2个线程发现控制变量为0,并且都实行初始化,而该过程本该仅仅执行一次。

如果我们需要对一个posix变量静态的初始化,可使用的方法是用一个互斥量对该变量的初始化进行控制。但有时候我们需要对该变量进行动态初始化,pthread_once就会方便的多

原型:

int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));


功能:本函数使用初值为PTHREAD_ONCE_INIT的once_control变量保证init_routine()函数在本进程执行序列中仅执行一次。

在多线程编程环境下,尽管pthread_once()调用会出现在多个线程中,init_routine()函数仅执行一次,究竟在哪个线程中执行是不定的,是由内核调度来决定。

Linux Threads使用互斥锁和条件变量保证由pthread_once()指定的函数执行且仅执行一次,而once_control表示是否执行过。

如果once_control的初值不是PTHREAD_ONCE_INIT(Linux Threads定义为0),pthread_once() 的行为就会不正常。

在Linux Threads中,实际”一次性函数”的执行状态有三种:NEVER(0)、IN_PROGRESS(1)、DONE (2),如果once初值设为1,则由于所有pthread_once()都必须等待其中一个激发”已执行一次”信号,因此所有pthread_once ()都会陷入永久的等待中;如果设为2,则表示该函数已执行过一次,从而所有pthread_once()都会立即返回0。

关于destroy

防止delete一个不完全对象

typedef char T_must_be_complete_type[sizeof(T) == 0 ? -1 : 1];
T_must_be_complete_type dummy; (void) dummy;


typedef定义一个char数组类型 T_MUST_BE_COMPELET_TYPE :

char[-1]:如果T只声明没有定义,为不完全类型, 没定义就没有析构函数,delete就不会调用析构函数了;

char[1]:T是完全类型,即有定义,有delete操作,可以调用析构函数。

由于sizeof不能用于不完全类型,所以事实上如果T是不完全类型,编译的时候将会在sizeof处报错。

作者使用下面这两句的意图:

// 如果编译时加Werror=unused-local-typedefs这个选项
// 则检查到未使用的local-typedefs会报错,因此我们下面使用一下...
T_must_be_complete_type dummy;

// 如果编译时加Wno-unused-parameter这个选项
// 则检查到未使用的parameter会报错,因此我们下面使用一下...
(void) dummy;


关于has_no_destroy

has_no_destroy用于判断一个类型是否需要销毁。

当且仅当一个类型是类类型,而且包含有no_destroy数据成员时,才不需要销毁。

以下代码使用了C++的SFINAE(Substitution Failure Is Not An Error,匹配失败并不是一种错误)特性(在另一篇博文末尾有介绍:http://blog.csdn.net/jiange_zh/article/details/52260080)。

template<typename T>
struct has_no_destroy
{
template <typename C>
static char test(typeof(&C::no_destroy)); // or decltype in C++11
template <typename C>
static int32_t test(...);
const static bool value = sizeof(test<T>(0)) == 1;
};

// 在Singleton的init函数中使用
static void init()
{
value_ = new T();
if (!detail::has_no_destroy<T>::value)
{
::atexit(destroy);
}
}


这里我还是简单分析下这里的逻辑吧:

当我们写下下面代码的时候:

Foo& foo = Singleton<Foo>::instance();


编译器将会用Foo来实例化 Singleton< T >,从而产生Singleton< Foo >实例。在instance()中,会调用init(),在inti函数中,将会实例化出detail::has_no_destroy< Foo >,并访问value静态变量的值,其值如下:

const static bool value = sizeof(test<T>(0)) == 1;


我们知道,sizeof是一个运算符,其值会在编译的时候计算出来,对一个函数sizeof,实际上是对该函数的返回类型进行sizeof,因此sizeof可能对两个test函数的返回类型char或者int32_t进行操作。如果是char,则sizeof结果为1,value为true,如果是int32_t,sizeof结果不为1,value为false。

因此要达到我们的目的,需要让“包含有no_destroy数据成员的类类型”匹配到第一个test函数,而其他情况匹配到第二个test函数上。

假设T是Foo,则实际上为:

sizeof(test<Foo>(0));


这个时候编译器会进行匹配(匹配规则见上述的博文),当匹配第一个test函数的时候,编译器发现Foo是一个类类型,因此使用::是正确的,如果Foo包含no_destroy数据成员,则这个函数匹配成功,value为true,如果Foo不包含no_destroy数据成员,则这个函数匹配失败——由于“匹配失败不是错误”,所以编译器并不会报错,而是匹配下一个test函数,这一次匹配成功了,value为false。

如果T是一个内置类型,则::的使用时错误的,因此第一个test匹配失败,编译器转而匹配第二个test,value为false。

以上的这些操作都是在编译期间完成的——也就是说,编译完成之后,代码中实际上只剩下value=true 或者value=false这一行代码了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: