您的位置:首页 > 其它

15章 进程间通信之同步(互斥锁、条件变量、读写锁、信号量)

2017-09-15 15:54 435 查看
同步:

为了允许在线程和进程间共享数据,必须同步。互斥锁和条件变量是同步的基础。如果互斥锁或者条件变量存放在多个进程间共享的某一个内存区,那么通过它可以进行多进程同步。

互斥锁和条件变量

互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行设置(加锁)。在访问完成后释放(解锁)互斥量,对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线程都会被阻塞直到当前线程释放该互斥锁。如果释放互斥量时有一个以上的线程阻塞,那么所有该锁上的阻塞线程都会变成可运行状态,第一个变为运行的线程就可以对互斥量加锁,其他线程就会看到互斥量依然是锁着的,只能回去再次等待它重新变为可用。在这种方式下,每次只有一个线程可以向前执行。

互斥锁系统调用

pthread_mutex_t;//互斥量系统定义的数据类型。

//静态分配则可以用对应宏初始化,动态分配则可以下面函数初始化。
int pthread_mutex_init (pthread_mutex_t *__mutex,
const pthread_mutexattr_t *__mutexattr);
//互斥量初始系统调用,使用默认属性则设置为NULL,否则传入一个已经初始化的属性结构体

int pthread_mutex_destroy (pthread_mutex_t *__mutex);
//如果动态分配(malloc)互斥量,则可以在释放(free)之前调用destory

int pthread_mutex_lock (pthread_mutex_t *__mutex);
//尝试给一个上锁,如果已经被另外一个线程锁住,那么函数将阻塞到该互斥锁解锁为止。
//如果多个线程阻塞等待互斥锁,那么系统将唤醒优先级最高的阻塞进程(线程也有优先级)。

int pthread_mutex_trylock (pthread_mutex_t *__mutex);
//试图对数据加锁,如果没有锁,则加锁,如果已经被其他进程锁了,则返回错误(EBUSY),且当前进程不阻塞继续执行。

int pthread_mutex_unlock (pthread_mutex_t *__mutex);
//给互斥量接锁,数据可供其他进程使用

pthread_mutexattr_t;//属性结构体,通过此结构体可以设置互斥锁属性
//互斥量属性结构体。对此赋值,传入上面init函数,可以初始化都有自定义属性的互斥量。属性注意3个,进程共享属性,健壮属性以及类型属性
int pthread_mutexattr_init (pthread_mutexattr_t *__attr);
//按照默认属性初始化,pthread_mutexattr_t数据结构体。
int pthread_mutexattr_destroy (pthread_mutexattr_t *__attr);
//反初始化pthread_mutexattr_t数据结构体。
int pthread_mutexattr_getpshared (const pthread_mutexattr_t *
__restrict __attr,
int *__restrict __pshared);
//得到pthread_mutexattr_t数据类型中的进程共享属性的值。

/*
共享属性:可以使多个进程和线程一样共享数据,那么共享属性的互斥量生效了。
*/
int pthread_mutexattr_setpshared (pthread_mutexattr_t *__attr,
int __pshared);
//设置pthread_mutexattr_t数据类型中的进程共享属性的值。

/*
健壮性属性:假如一个线程取得了锁,但是线程退出锁未解开,这种情况下使用互斥量的行为是为定义的(是否取得锁已经不清楚了),为了应对这个问题,提出了健壮性属性。
*/
int pthread_mutexattr_getrobust (const pthread_mutexattr_t *__attr,
int *__robustness);
//得到pthread_mutexattr_t数据类型中的互斥量健壮性的值。
int pthread_mutexattr_getrobust_np (const pthread_mutexattr_t *__attr,
int *__robustness);
//设置pthread_mutexattr_t数据类型中的互斥量健壮性的值。

/*
类型属性:定义在错误使用了锁(锁了没解,没上锁确解锁,已解锁继续解锁)之后,函数返回的错误类型,以及系统处理之后的结果。
*/
int pthread_mutexattr_gettype (const pthread_mutexattr_t *__restrict
__attr, int *__restrict __kind);
//得到pthread_mutexattr_t数据类型中的互斥量类型的值。值不同对应互斥量加锁之后处理稍微不同。
int pthread_mutexattr_settype (pthread_mutexattr_t *__attr, int __kind);
//设置pthread_mutexattr_t数据类型中的互斥量类型的值。值不同对应互斥量加锁之后处理稍微不同。


死锁

死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

1. 对互斥量锁两次

如果线程试图对同一个互斥量加锁两次,那么它自身就会陷入死锁状态。

2. 程序中使用一个以上的互斥量时

如果允许一个线程一直占有第一个互斥量,并且在试图锁住第二个互斥量时处于阻塞状态,但是拥有第二个互斥量的线程也在试图锁住第一个互斥量。因为两个线程都在相互请求另一个线程拥有的资源,所以这两个线程都无法向前运行,于是就产生死锁。

条件变量系统调用

条件变量是线程可用的另一种同步机制。条件变量给多个线程提供了一个会合的场所。条件变量与互斥量一起使用时,允许线程以无竞争的方式等待特定的条件发生。条件本身是由互斥量保护的。线程在改变条件状态之前必须首先锁住互斥量。其他线程在获得互斥量之前不会察觉到这种改变,因为互斥量必须在锁定以后才能计算条件。

条件变量使用固定模式如下

1、对于发送端:

  上锁,设定条件为真,发送信号,解锁。

2、对于等待端:

  上锁,while判断,修改条件为假,接锁。

struct{
pthread_mutex_t mutex;
pthread_cond_t cond;
维护本条件的各个变量;
}var = {PTHREAD_MUTEX_INITIALIZER , PTHREAD_COND_INITIALIZER,.... };

//1、发送条件线程
pthread_mutex_lock(&var.mutex);
//设置条件为真;
pthread_mutex_signal(&var.cond);
pthread_mutex_unlock(&var.mutex);

//2、等待条件真线程
pthread_mutex_lock(&var.mutex);
while(条件假)
pthread_cond_wait(&var.cond);
//修改条件
pthread_mutex_unlock(&var.mutex);


pthread_cond_wait(..):当前线程立即进入睡眠状态等待通知,同时互斥变量mutex解锁(这两步操作是原子的,不可分割),以便其它线程能进入临界区修改变量,当pthread_cond_wait返回时互斥锁再次锁住。

pthread_mutex_signal():线程调用此函数后,除了当前线程继续往下执行以外; 操作系统同时做如下动作:从condition_wait()中进入睡眠的线程中选一个线程唤醒, 同时被唤醒的线程试图锁(lock)住互斥量mutex, 当成功锁住后,线程就从condition_wait()中成功返回了。 必须注意,一定要在改变条件状态以后再给线程发信号。

3、利用while判断为了防止虚假唤醒

可能出现即使没有线程调用condition_signal, 原先调用condition_wait的函数也可能会返回。此时线程被唤醒了,但是条件并不满足,这个时候如果不对条件进行检查而往下执行,就可能会导致后续的处理出现错误。

虚假唤醒在linux的多处理器系统中/在程序接收到信号时可能回发生。在系统设计时应该可以避免虚假唤醒,但是这会影响条件变量的执行效率,而既然通过while循环就能避免虚假唤醒造成的错误,因此程序的逻辑就变成了while循环的情况。

#define PTHREAD_COND_INITIALIZER { { 0, 0, 0, 0, 0, (void *) 0, 0, 0 } }
typedef union
{
struct
{
int __lock;
unsigned int __futex;
__extension__ unsigned long long int __total_seq;
__extension__ unsigned long long int __wakeup_seq;
__extension__ unsigned long long int __woken_seq;
void *__mutex;
unsigned int __nwaiters;
unsigned int __broadcast_seq;
} __data;
char __size[__SIZEOF_PTHREAD_COND_T];
__extension__ long long int __align;
} pthread_cond_t;//此处为什么需要利用枚举?并且还涉及了数据对齐的操作。

//pthread_cond_t初始化位默认属性。
int pthread_cond_init (pthread_cond_t *__restrict __cond,
const pthread_condattr_t *__restrict __cond_attr);
//先利用反初始化,然后再释放条件变量底层内存空间。
int pthread_cond_destroy (pthread_cond_t *__cond);

//条件变为真之后,signal函数通知。给定时间内条件不满足,则返回一个对应错误码。
//进入wait,对互斥量解锁并等待上锁的条件改变,返回之后对互斥量上锁。
int pthread_cond_wait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex);

int pthread_cond_timedwait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex,
const struct timespec *__restrict __abstime);
//指定函数返回时的系统时间,多了一个等待时间设置而已,时间是一个绝对时间。

//给条件变量发信号,一定要在条件改变之后再给线程发信号。

int pthread_cond_signal (pthread_cond_t *__cond);//唤醒一个线程
pthread_cond_broadcast (pthread_cond_t *__cond); //唤醒所有线程

//条件变量属性结构体以及初始化
/*
条件变量通常有:共享属性和时钟属性。共享属性可以指定互斥锁或者条件变量在不同进程间共享,而不是单单在单个进程内的不同线程间共享。
*/
pthread_condattr_t;
int pthread_condattr_init (pthread_condattr_t *__attr);
int pthread_condattr_destroy (pthread_condattr_t *__attr);

//条件变量属性设置函数
int pthread_condattr_getpshared (const pthread_condattr_t *
__restrict __attr,
int *__restrict __pshared);//取得共享属性
int pthread_condattr_setpshared (pthread_condattr_t *__attr,
int __pshared);//设置共享属性
int pthread_condattr_getclock (const pthread_condattr_t *
__restrict __attr,
__clockid_t *__restrict __clock_id);//取得时钟属性
int pthread_condattr_setclock (pthread_condattr_t *__attr,
__clockid_t __clock_id);//设置时钟属性


生产者-消费者问题

生产者-消费者属于同步的经典问题,也叫做有界缓冲区问题。一个或多个生产者(线程或进程)创建着一个个的数据条目,然后这些条目由一个或多个消费者 (线程或进程)处理。 下图中在单个进程中有多个生产者线程和单个消费者线程。生产者线程同步往缓冲区填充条目。消费者线程同步从缓冲区取出已经填充了条目进程判定生产者是否同步。



1、先进行生产者同步,生产者填充缓冲区,消费者仅用于检验生产者同步状况:

#define MAXNITEMS   1000000//最大条目
#define MAXNTHREADS 100
int nitems;//生产者和消费者读取
struct{//努力将共享数据和同步量放在一个结构中,这样编程技巧
pthread_mutex_t mutex;//互斥锁
int buff[MAXNITEMS];
int nput;
int nval;
}shared = {PTHREAD_MUTEX_INITIALIZER};//通过宏定义静态初始化第一个变量

void *produce(void *arg)//多个线程共同将buff中的nitems存储满
{                       //count记录每一个线程经过系统调用之后,存储了几个buff.
for( ; ; ){         //当存储满了之后,一个一个释放锁,然后退出.
pthread_mutex_lock(&shared.mutex);//取得锁
if(shared.nput >= nitems){
pthread_mutex_unlock(&shared.mutex);
return (NULL);
}
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
pthread_mutex_unlock(&shared.mutex);
*((int *)arg) += 1;
}
}
void *consum(void *arg)
{
int i;
for(i = 0 ; i < nitems ; i++){
if(shared.buff[i] != i)//消费者线程用来检查线程调用是否同步,加入已经同步,那么缓冲区的索引必定等于缓冲区数值
//假如不同步,那么就会出现多个同时访问的问题,出现异步错误
printf("buff[%d] = %d\n" , i , shared.buff[i]);
}
return (NULL);
}
//生产者之间的同步,直到所有生产者线程全部完成工作后,启动消费者线程.
int main(int argc , char **argv)
{
int i , nthreads , count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS],tid_consume;//多个生产线程存储区域,一个消费线程
if(argc != 3)
err_quit("usage: <items><nthreads>");
nitems = min(atoi(argv[1]) , MAXNITEMS);//指定条目数量
nthreads = min(atoi(argv[2]) , MAXNTHREADS);//指定线程数量
pthread_setconcurrency(nthreads);//设置线程并发特性
for(i = 0; i<nthreads ; i++){//创建指定的n个线程,并传递count参数
count[i] = 0;
pthread_create(&tid_produce[i] , NULL , produce , &count[i]);//传递参数,最后为向线程传递参数
}
for(i = 0 ; i < nthreads ; i++){//主线程等待所有线程结束,并打印线程返回参数
pthread_join(tid_produce[i] , NULL);
printf("count[%d] = %d\n" , i , count[i]);//查看每一个生产线程在缓冲区存储了几个条目
}
pthread_create(&tid_consume , NULL , consum, NULL);//创建消费者线程
pthread_join(tid_consume , NULL);//等待消费者结束
exit(0);
}


./ClientDebug/Client 1000000 10

count[0] = 84526

count[1] = 135714

count[2] = 110601

count[3] = 107207

count[4] = 128802

count[5] = 117924

count[6] = 75405

count[7] = 65720

count[8] = 81348

count[9] = 92753

如上就是系统调用之后,各个线程向缓冲区存储条目的个数。

2、1中仅仅实现了生产者同步(因为是填充完毕后才进程消费),现在修改成生产者启动后立即启动消费者线程,因此在生产者和消费者之间需要一层同步,通过增加consume_wait函数以轮询的方式来同步消费者线程和生产者线程以确保消费者只处理已由生产者存放的数据条目。这种方式就是浪费CPU的时间,代码如下。

#define MAXNITEMS   1000000//最大条目
#define MAXNTHREADS 100
int nitems;//生产者和消费者读取
struct{//努力将共享数据和同步量放在一个结构中,这样编程技巧
pthread_mutex_t mutex;//互斥锁
int buff[MAXNITEMS];
int nput;
int nval;
}shared = {PTHREAD_MUTEX_INITIALIZER};//通过宏定义静态初始化第一个变量

void *produce(void *arg)//多个线程共同将buff中的nitems存储满
{                       //count记录每一个线程经过系统调用之后,存储了几个buff.
for( ; ; ){         //当存储满了之后,一个一个释放锁,然后退出.
pthread_mutex_lock(&shared.mutex);//取得锁
if(shared.nput >= nitems){
pthread_mutex_unlock(&shared.mutex);
return (NULL);
}
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
pthread_mutex_unlock(&shared.mutex);
*((int *)arg) += 1;
}
}
void consume_wait(int i)//必须等待到生产着产生了第i个条目
{
for(; ; ){//循环,一次一次解锁又上锁,进行轮询浪费CPU时间.
pthread_mutex_lock(&shared.mutex);//必须取得锁,不然等待
if(i < shared.nput){//有了新的条目准备好了
pthread_mutex_unlock(&shared.mutex);
return ;
}
pthread_mutex_unlock(&shared.mutex);
}
}
void *consum(void *arg)
{
int i;
for(i = 0 ; i < nitems ; i++){
consume_wait(i);//消费者线程必须等待
if(shared.buff[i] != i)//消费者线程用来检查线程调用是否同步,加入已经同步,那么缓冲区的索引必定等于缓冲区数值
//假如不同步,那么就会出现多个同时访问的问题,出现异步错误
printf("buff[%d] = %d\n" , i , shared.buff[i]);
}
return (NULL);
}
//生产者之间的同步,直到所有生产者线程全部完成工作后,启动消费者线程.
int main(int argc , char **argv)
{
int i , nthreads , count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS],tid_consume;//多个生产线程存储区域,一个消费线程
if(argc != 3)
err_quit("usage: Client <items><nthreads>\n");
nitems = min(atoi(argv[1]) , MAXNITEMS);//指定条目数量
nthreads = min(atoi(argv[2]) , MAXNTHREADS);//指定线程数量
pthread_setconcurrency(nthreads);//设置线程并发特性
for(i = 0; i < nthreads ; i++){//创建指定的n个线程,并传递count参数
count[i] = 0;
pthread_create(&tid_produce[i] , NULL , produce , &count[i]);//传递参数,最后为向线程传递参数
}
pthread_create(&tid_consume , NULL , consum , NULL);//生产者完成后立马创建消费者线程,这时候所有线程并发执行。消费者轮询检查并发问题
for(i = 0 ; i < nthreads ; i++){//主线程等待所有线程结束,并打印线程返回参数
pthread_join(tid_produce[i] , NULL);//主进程
printf("count[%d] = %d\n" , i , count[i]);//查看每一个生产线程在缓冲区存储了几个条目
}
pthread_join(tid_consume , NULL);//等待消费者结束
exit(0);
}


3、2中以轮询的方式同步消费者和生产者,消费者线程浪费了许多CPU时间,这里利用互斥锁用于主锁,条件变量则用于等待可以解决这些问题:

#define MAXNITEMS   1000000//最大条目
#define MAXNTHREADS 100
int nitems;//生产者和消费者读取
int buff[MAXNITEMS];//将buff放在外面,但是线程通过条件变量和互斥锁同步
struct{
pthread_mutex_t mutex;//互斥锁
int nput;
int nval;
}put = {PTHREAD_MUTEX_INITIALIZER};//通过宏定义静态初始化第一个变量
struct{
pthread_mutex_t mutex;//配合条件变量的互斥量
pthread_cond_t  cond;//条件变量
int nready;//对应的条件
}nready = {PTHREAD_MUTEX_INITIALIZER , PTHREAD_COND_INITIALIZER , 0};//静态初始化
//上面结构体是一种习惯,用什么同步保护什么变量就放在一个结构体里面.很好的编程习惯.
void *produce(void *arg)//多个线程共同将buff中的nitems存储满
{                       //count记录每一个线程经过系统调用之后,存储了几个buff.
for( ; ; ){         //当存储满了之后,一个一个释放锁,然后退出.
pthread_mutex_lock(&put.mutex);//取得锁
if(put.nput >= nitems){
pthread_mutex_unlock(&put.mutex);
return (NULL);
}
buff[put.nput] = put.nval;//这一部分是对buff填充
put.nput++;
put.nval++;
pthread_mutex_unlock(&put.mutex);

pthread_mutex_lock(&nready.mutex);//通知消费者部分
nready.nready++;//修改条件为真
pthread_cond_signal(&nready.cond);//条件到达,发送信号
pthread_mutex_unlock(&nready.mutex);//释放对应互斥锁
*((int *)arg) += 1;//记录线程填充的缓冲区数目.
}
}

void *consum(void *arg)
{
int i;
for(i = 0 ; i < nitems ; i++){
pthread_mutex_lock(&nready.mutex);//取得互斥锁
while(nready.nready == 0)
pthread_cond_wait(&nready.cond , &nready.mutex);//等待条件到达.此函数原子做了两件事.
//第一:给互斥量暂时解锁,让其他线程可以进行
//第二:让调用线程睡觉,直到其他线程signal
//注意:此函数返回,立马重新给互斥锁上锁,并且再次检查条件,防止虚假的唤醒(条件没有到达的唤醒).
nready.nready--;//修改条件,等待下一次重复
pthread_mutex_unlock(&nready.mutex);//解锁

//下面可以判断生产者是否同步成功,因为已经被填充了,所以这里可以读,不需要同步,直接处理
if(buff[i] != i)//消费者线程用来检查线程调用是否同步,加入已经同步,那么缓冲区的索引必定等于缓冲区数值
printf("buff[%d] = %d\n" , i , buff[i]);
}
return (NULL);
}
//生产者之间的同步,直到所有生产者线程全部完成工作后,启动消费者线程.
int main(int argc , char **argv)
{
int i , nthreads , count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS],tid_consume;//多个生产线程存储区域,一个消费线程
if(argc != 3)
err_quit("usage: Client <items><nthreads>\n");
nitems = min(atoi(argv[1]) , MAXNITEMS);//指定条目数量
nthreads = min(atoi(argv[2]) , MAXNTHREADS);//指定线程数量
pthread_setconcurrency(nthreads);//设置线程并发特性
for(i = 0; i < nthreads ; i++){//创建指定的n个线程,并传递count参数
count[i] = 0;
pthread_create(&tid_produce[i] , NULL , produce , &count[i]);//传递参数,最后为向线程传递参数
}
pthread_create(&tid_consume , NULL , consum , NULL);//创建消费者线程
for(i = 0 ; i < nthreads ; i++){//主线程等待所有线程结束,并打印线程返回参数
pthread_join(tid_produce[i] , NULL);//主进程
printf("count[%d] = %d\n" , i , count[i]);//查看每一个生产线程在缓冲区存储了几个条目
}
pthread_join(tid_consume , NULL);//等待消费者结束
exit(0);
}


./Client 100000 6

count[0] = 32362

count[1] = 6375

count[2] = 7970

count[3] = 3660

count[4] = 42501

count[5] = 7132

直接同步7个进程,同时访问一个缓存区,从来不会产生冲突。这就是多线程同步的魔力。很牛逼。同步。。。

4、关于条件变量的虚假唤醒

读写锁

读写锁通常用于一个写入者,多个读入者的模型。也就是只要没有线程在修改某个给定的数据,那么任意数量的线程都可以拥有该数据的访问权。仅当没有线程读或者修改某个给定数据时候,当前线程才可以修改它。读写锁用于读的称为共享锁,用于写的称为独占锁。一个简单的日常类比就是访问银行账户。多个线程可以同时读出某一个账户的收支结余,一旦一个线程需要更新收支结余,该线程必须等待所有读出者完成收支结余读出,然后允许更新线程修改收支结余,直到更新完毕,任何读出者都允许读收支结余。

读写锁系统调用

pthread_rwlock_t;//读写锁数据类型,可以静态初始化也可以动态初始化(可以指定属性等等)
//初始化读写锁数据以及其属性,属性NULL表示默认
int pthread_rwlock_init (pthread_rwlock_t *__restrict __rwlock,
const pthread_rwlockattr_t *__restrict
__attr);
//释放内存之前,对读写锁进行清理工作。

int pthread_rwlock_destroy (pthread_rwlock_t *__rwlock);
//读模式下锁定读写锁。
int pthread_rwlock_rdlock (pthread_rwlock_t *__rwlock);
//写模式下锁定读写锁。
int pthread_rwlock_wrlock (pthread_rwlock_t *__rwlock);
//读写模式下进行解锁。
int pthread_rwlock_unlock (pthread_rwlock_t *__rwlock);
//可以获取锁则返回0,锁住读写锁;不可以获取返回EBUSY,进程继续执行。
int pthread_rwlock_tryrdlock (pthread_rwlock_t *__rwlock);
//可以获取锁则返回0,锁住读写锁;不可以获取返回EBUSY,进程继续执行。
int pthread_rwlock_trywrlock (pthread_rwlock_t *__rwlock);

//属性初始化默认,通常就是共享属性,设置了可以使得读写锁可用于多个不同的线程
int pthread_rwlockattr_init (pthread_rwlockattr_t *__attr);
//属性反初始化
int pthread_rwlockattr_destroy (pthread_rwlockattr_t *__attr);
//获取进程共享属性
int pthread_rwlockattr_getpshared (const pthread_rwlockattr_t *
__restrict __attr,
int *__restrict __pshared);
//设置进程共享属性
int pthread_rwlockattr_setpshared (const pthread_rwlockattr_t *
__restrict __attr,
int *__restrict __pshared);


使用互斥锁和条件变量实现读写锁(高级内容)

记录锁

Posix信号量

Posix有名信号量:使用文件系统路径名标识,可以用于进程或线程同步。

Posix基于内存信号量:存放在共享内存中,可用于进程或线程同步。

信号量对应系统调用

信号量除了可以像互斥锁那样使用外,信号量还有互斥锁没有提供的特性:互斥锁必须总是由锁住它的线程解锁,但是信号量的挂出却不必由执行过它的等待操作的同一线程执行。就是说post一次信号量加1,wait一次信号量减一。不论是哪个线程这样做,对应的路径信号量都是执行对应的操作。当信号量为0,wait的时候,就会阻塞。

typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;
/*
打开或者创建一个一存在的有名信号量。
name:对应存在的路径名。
oflag:可以是0、O_CREAT(创建不管是否之前存在)、O_CREAT|O_EXCL(创建假如之前存在,那么报错)。
0:表示读取操作
如果指定O_CREAT,那么第三 四个参数必须。
第三个参数:可以是mode权限位。
第四个参数:指定信号量初始值。
成功返回指向信号量指针随后用于其他函数,失败返回SEM_FAILED错误。
*/
sem_t *sem_open (const char *__name, int __oflag, ...) __THROW;

/*
关闭有名信号量。并且在进程终止时候,内核对其上打开的有名信号量自动执行信号量关闭操作。关闭并没有删除,因为有名信号量至少是内核持续性的,即使没有进程打开该信号量,一样存在。
*/
int sem_close (sem_t *__sem) ;

/*
关闭一个信号量并没有从系统内核删除,通过此函数传入有名信号量路径从系统删除。且必须等待引用计数(所有打开进程都调用了sem_close)为0才可以。
*/
int sem_unlink (const char *__name) ;

/*
测试指定信号量的值,值大于0,那么将数值减1,并立即返回。如果该值等于0,将调用线程投入睡眠,直到该值变为大于0,这是再减1并返回。这里的测试减1属于原子操作。
其中trywait当值为0时候,并不将线程投入睡觉,并返回EAGAIN错误。
*/
int sem_wait (sem_t *__sem);
int sem_trywait (sem_t *__sem);

/*
指定信号量值加1,并唤醒正在等待该信号量数值变为正数的任意线程。
getvalue:将信号量当前数值返回在sval指向的区域。如果信号量当前已经上锁,那么返回值要么是0要么是负数,负数的绝对值表示等待信号量解锁的线程数。
*/
int sem_post (sem_t *__sem) ;
int sem_getvalue (sem_t *__restrict __sem, int *__restrict __sval)

/*
前面都是操作有名信号量。初始化基于内存的信号量,由应用程序分配信号量的内存空间。因为共享内存,将基于内存信号量放入共享内存,然后将共享内存映射到当前进程,在可以当前进程中改变放入内存中信号量值,也就是将信号量搬到一个都可以读取到的地方,然后通过指针修改,这是可以理解的,很容易。
sem:存放信号量地址,因为函数修改上面的值,仅仅传递指针即可
pshared:0或者>0
0:待初始化信号量在同一个进程的各个线程共享。
>0:此时信号量必须放入共享内存,可以供所以进程访问共享内存时访问该信号量。至少随进程持续性,但是放入共享内存区则就一只存在直到共享内存区被删除。
sem_open:返回一个指向某个sem_t变量指针,由该函数本身分配并初始化。
sem_init:第一个参数是指向变量的指针,那么就是该变量由调用者分配,函数仅仅初始化而已。这里就是传入指针和返回指针的区别,很大的区别,我们必须明白这里的区别。
value:初始化数值。
*/
int sem_init (sem_t *__sem, int __pshared, unsigned int __value);

/*
用于摧毁基于内存共享的信号量
*/
int sem_destroy (sem_t *__sem);


来一个简单例程解释上述函数:

semcreate:

int main(int argc , char *argv[])
{
int c , oflag;
sem_t *sem;
unsigned int value;

oflag = O_RDWR | O_CREAT;//队列权限和创建
value = 1;

while( (c = getopt(argc , argv , "ei:")) != -1){
switch(c){
case 'e':
oflag |= O_EXCL;//创建失败退出功能
break;
case 'i':
value = atoi(optarg);
}

}
if(optind != argc -1)
err_quit("[-e] [-i initalvalue] <pathname>");
sem = Sem_open("/tmp/test" , oflag , FILE_MODE , value);//创建信号量,后面两个参数必须使用
Sem_close(sem);
exit(0);
}


semunlink:

int main(int argc , char *argv[])
{
if(argc != 2)
err_quit("usage:semlink <name>");
Sem_unlink(argv[1]);//通过路径访问可用于多个进程
exit(0);
}


semgetvalue:

int main(int argc , char *argv[])
{
sem_t *sem;//返回指针可以节约开销.
int val;
if(argc != 2)
err_quit("usage: semgetvalue <name>");
sem = Sem_open(argv[1] , 0);//因为是打开,所以第二参数设置为0,没有后面参数
Sem_getvalue(sem , &val);//获取信号量对应的数值
printf("value = %d\n" , val);
}


semwait:

int main(int argc , char *argv[])//将有名信号量减去1
{
sem_t *sem;
int val;
if(argc != 2)
err_quit("usage: semwait <name>");

sem = Sem_open(argv[1] , 0);//打开
Sem_wait(sem);//减去1
Sem_getvalue(sem , &val);
printf("pid %ld has sem,value = %d\n" , (long)getpid() , val);

//pause();//阻塞
exit(0);
}


sempost:

int main(int argc , char *argv[])//将有名信号量加上1
{
sem_t *sem;
int val;
if(argc != 2)
err_quit("usage: sempost <name>");

sem = Sem_open(argv[1] , 0);//打开
Sem_post(sem);//加上1
Sem_getvalue(sem , &val);
printf("value = %d\n" , val);

exit(0);
}


$ semcreate wangjun //通过默认数值创建信号量。

$ semgetvalue wangjun//获取默认创建信号量值为1。

value = 1

$ semwait wangjun//等待信号量,这时候值减去1。

pid 12352 has sem,value = 0

$ semwait wangjun//进程阻塞,通过中断,终止这个进程

$ semgetvalue wangjun//重新获取信号量数值,还是为0.证明信号量值由内核维护,等待进程异常终止,并没有释放信号量。这个区别于记录锁。

value = 0

$ semwait wangjun &//后台运行,将阻塞1

$ semwait wangjun &//后台运行,将阻塞2

$ sempost wangjun//后台运行进程释放一个

$ sempost wangjun//后台运行进程再次释放一个

信号量搞定生产者-消费者问题

前面讨论了利用互斥锁同步生产者,以及条件变量和互斥锁来同步生产者和消费者,增加环形缓冲区的功能支持。

int nitems;//生产数量
#define NBUFF 20 //提供的唤醒缓冲区
struct{
int buff[NBUFF];
sem_t *mutex , *nempty , *nstored;//保护共享区,记录缓冲区空区域,记录缓冲区已填充区域
}shared;//保护缓冲区放在一个结构体

int main(int argc , char *argv[])
{
pthread_t tid_produce , tid_consume;
if(argc != 2){
err_quit("<items>");
}
nitems = atoi(argv[1]);//nitems定义生产者生产的数目量
shared.mutex = Sem_open(Px_ipc_name("sem_mutex") , O_CREAT|O_EXCL , FILE_MODE , 1);//创建互斥信号量
shared.nempty = Sem_open(Px_ipc_name("sem_nempty") , O_CREAT|O_EXCL, FILE_MODE , NBUFF);//创建信号量,空白NBUFF个
shared.nstored = Sem_open(Px_ipc_name("sem_nstored") , O_CREAT|O_EXCL, FILE_MODE , 0);//创建信号量,存储0个
Pthread_create();
exit(0);
}
void *produce(void *argv)
{
int i ;
for( i = 0 ; i < nitems ; ++i){
Sem_wait(shared.nempty);//等待有一个空位置存放,第一次由NBUFF变为NBUFF-1
Sem_wait(shared.mutex);//当多个生产者时候,可以保护共享区
shared.buff[i % NBUFF] = i;//这里体现了环形缓冲区效果
Sem_post(shared.mutex);//反过来挂起
Sem_post(shared.nstored);//反过来挂起,告知存储了一个条目,第一次执行由0变为1
}
return (NULL);
}
void *consume(void *argv)
{
int i ;
for( i = 0 ; i < nitems ; ++i){
Sem_wait(shared.nstored);//至少存储一个,这样才可以检测
Sem_wait(shared.mutex);//消费者在消费,你不要生产了
if(shared.buff[i % NBUFF] != i)
printf("buff[%d] = %d\n" , shared.buff[i % NBUFF]);//检查效果
Sem_post(shared.mutex);//反过来挂起
Sem_post(shared.nempty);//通知已经消费完毕,又有一个空位置了.
}
return (NULL);
}


SystemV信号量

在内核中维护,可用于进程或线程间的同步。

计数信号量:其数值在0到某个限制值之间的信号量。Posix 信号量都是单个计数信号量。

计数信号量集:一个或多个计数信号量构成的一个集合。Sys V信号量都是计数信号量集。因此系统维护的结构会多一点,相应的操作也会复杂一点。

SysV信号量系统调用

struct ipc_perm//权限结构,供下面结构使用
{
__key_t __key;          /* Key.  */
__uid_t uid;            /* Owner's user ID.  */
__gid_t gid;            /* Owner's group ID.  */
__uid_t cuid;           /* Creator's user ID.  */
__gid_t cgid;           /* Creator's group ID.  */
unsigned short int mode;        /* Read/write permission.  */
unsigned short int __pad1;
unsigned short int __seq;       /* Sequence number.  */
unsigned short int __pad2;
__syscall_ulong_t __glibc_reserved1;
__syscall_ulong_t __glibc_reserved2;
};

/*内核为每一个信号集,维护一个下述结构*/
struct semid_ds
{
struct ipc_perm sem_perm;     /* operation permission struct */
__time_t sem_otime;           /* last semop() time */
__syscall_ulong_t __glibc_reserved1;
__time_t sem_ctime;           /* last time changed by semctl() */
__syscall_ulong_t __glibc_reserved2;
__syscall_ulong_t sem_nsems;      /* number of semaphores in set 信号集的个数,每个信号量由下面结构描述 */
__syscall_ulong_t __glibc_reserved3;
__syscall_ulong_t __glibc_reserved4;
};
struct sem{//信号量集中每一个信号量对应一个此结构
ushort_t semval;//非负的信号量值。
short sempid;//最后操作该信号量的进程PID。
ushort_t semncnt;//等待其值增长的进程数。
ushort_t semzcnt;//等待其值变为0的进程数。
}
//semid_ds中声明几个信号量,那么内核就为此信号集维护几个上述sem结构体(sem结构数组)。对应于下面相关函数的操作。

/*
通过第四个参数设置或者访问已经存在信号量集
key:信号量标识符
nsems:指定信号集中信号量数目,如果访问可置为0
semflg:权限操作
注意:调用该函数创建信号量集,则该集合中每一个信号量关联的各个sem结构并没有初始化。必须通过调用semctl操作。
*/
int semget (key_t __key, int __nsems, int __semflg);

/*
创建或者访问已经存在信号量集
__semid:semget返回的信号标识符
__semnum:待控制信号集内某一个成员。0、1、2、nsem-1
__cmd:控制命令,对应的东西都存储在下面联合体里面
#define GETPID      11 //get sempid
#define GETVAL      12 //get semval
#define GETALL      13 //get all semval's
#define GETNCNT     14 //get semncnt
#define GETZCNT     15 //get semzcnt
#define SETVAL      16 //set semval
#define SETALL      17 //set all semval's
//Control commands for `msgctl', `semctl', and `shmctl'.
#define IPC_RMID    0    //Remove identifier.
#define IPC_SET     1    //Set `ipc_perm' options.
#define IPC_STAT    2    //Get `ipc_perm' options.
union semum:第四个联合体参数依据第三个参数设定。是存储ids返回的值,还是存储设置ids的值。
union semun
{
int val;               <= value for SETVAL
struct semid_ds *buf;      <= buffer for IPC_STAT & IPC_SET
unsigned short int *array;     <= array for GETALL & SETALL
struct seminfo *__buf;     <= buffer for IPC_INFO
};

*/
int semctl (int __semid, int __semnum, int __cmd, ...);

/*
操作semget打开的信号集。
__semid:semget返回的信号标识符
__sops:指向sembuf结构数组,数组内每一个元素,对应某一个信号量的操作。
struct sembuf
{
unsigned short int sem_num;//指定操作此结构对应哪一个信号量。0、1、sem-1
short int sem_op;//指定对信号的操作
//op>0,调用者等待semval变为0,否则立即返回。如果semval不为0,则相应信号量semzcnt+1,并阻塞进程到semval为0。
//op<0,参见课本。调用者等待semval >= |op|。
//op=0,参见课本。调用者等待semval >= 0。
short int sem_flg;
};
__nsops:说明sembuf结构数组元素个数,供函数通过指针访问所有数组元素操作。因为这是通过传递指针进去访问数组。
*/
int semop (int __semid, struct sembuf *__sops, size_t __nsops);
//由上面这些函数,说明Sys V信号量复杂性增加了。


简单例子:

semcreat.c:

创建信号量

int main(int argc , char *argv[])//通过key,创建消息队列
{
int oflag , nsems , semid;
oflag = SVSEM_MODE | IPC_CREAT | IPC_EXCL;//模式加上创建属性
if(argc < 3)
err_quit("<pahtname><nsems>");
nsems = atoi(argv[2]);
semid = Semget(Ftok(argv[1] , 0) , nsems , oflag);
exit(0);
}


semsetvalue.c:

设置信号量集合中的信号量数值

这里必须注意 arg.array必须赋值一个已经初始化的指针。

int main(int argc , char *argv[])//设置某个信号量集中的所有数值
{
int semid , nsems , i;

struct semid_ds seminfo;//对照联合体给出,这个地方给出变量而不是指针,因为变量初始化了地址
//可以传递给arg.buf,加入是*seminfo,那么指针没有初始化,必定提示Bad address
unsigned short int *array;
union semun arg;//联合体,向设置传参数,其中参数并没有初始化

if(argc < 2)
err_quit("<pathname>");
semid = Semget(Ftok(argv[1] , 0 ) , 0 , 0);//读取信号量集,所以后面二三参数为0

arg.buf = &seminfo;//地址初始化之后,才可以存储数据进去.否则肯定未定义.静态初始化或者动态分配
Semctl(semid , 0 , IPC_STAT , arg);//通过arg获取信号集合结构信息
nsems = arg.buf->sem_nsems;//获取信号集中信号量数目

if(argc != nsems +2)//函数名, 路径 , 信号量设置参数
err_quit("fdf");

array = (unsigned short *)Calloc(nsems , sizeof(unsigned short));//因为需要设置信号量数值,所以需要动态分配内存,然后设置
arg.array = array;//动态内存分配,存储对应数值
for(i = 0 ; i < nsems ; ++i){
array[i] = atoi(argv[2+i]);//将数值设置成对应信号量值
}
Semctl(semid , 0 , SETALL , arg);//必须通过联合体传参数进去,否则报错

exit(0);
}


semgetvalue.c:

arg.buf的赋值必须注意,给一个已经初始化的地址。

int main(int argc , char *argv[])//读取并输出某个信号量集中的所有数值
{
int semid , nsems , i;

struct semid_ds seminfo;//对照联合体给出
unsigned short int *array;
union semun arg;//联合体,向设置传参数,其中参数并没有初始化

if(argc != 2)
err_quit("<pathname>");
semid = Semget(Ftok(argv[1] , 0 ) , 0 , 0);//读取信号量集,所以后面二三参数为0

arg.buf = &seminfo;//地址初始化之后,才可以存储数据进去.否则肯定未定义.静态初始化或者动态分配
Semctl(semid , 0 , IPC_STAT , arg);//通过arg获取信号集合结构信息
nsems = arg.buf->sem_nsems;//获取信号集中信号量数目,便于后面动态分配内存

array = (unsigned short *)Calloc(nsems , sizeof(unsigned short));//因为需要设置信号量数值,所以需要动态分配内存,然后设置
arg.array = array;//动态内存分配,存储对应数值

Semctl(semid , 0 , GETALL , arg);//必须通过联合体传参数进去,否则报错
for(i = 0 ; i < nsems ; ++i){
printf("semval[%d] = %d\n" , i , array[i]);//访问地址,通过数组的形式
}

exit(0);
}


semops.c:

int main(int argc , char *argv[])//对某个信号量集,执行一数组的操作
{
int semid , i;
struct sembuf *ptr;//这是一个数组指针,对应信号量集中某个信号量的操作说明
int flag = 0;//操作选项

semid = Semget(Ftok(argv[1] , 0) , 0 , 0);

ptr = (struct sembuf *)Calloc(3 , sizeof(struct sembuf));//动态分配内存空间,数组形式
for(i = 0 ; i < 3 ; ++i){//对每一个数组成员初始化
ptr[i].sem_num = i;//设置信号量数字
ptr[i].sem_op = atoi(argv[2 + i]);//对信号量如何操作
ptr[i].sem_flg = flag;//nowait还是undo操作
}//动态分配数组,或者静态分配数组都可以
Semop(semid , ptr , 3);//通过指针传递sembuf对信号集中的信号量进行全部设置
}


假如不指定flag=SEM_UNDO,

各种同步原语之间区别

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐