您的位置:首页 > 运维架构 > Linux

Linux多线程 互斥锁与条件变量使用

2013-06-07 13:26 686 查看
2013/6/7 caichengyu
如果你认真看下去,必将学习到不少知识,如有错还希望您提出来~
一、互斥锁

为什么要使用锁?

linux下,锁的种类很多,包括互斥锁,文件锁,读写锁······其实信号量说白了也是一种锁。互斥量从本质上说就是一把锁, 提供对共享资源的保护访问。

1. 初始化:

线程的互斥量数据类型是pthread_mutex_t,使用之前要初始化。

静态初始化:

pthread_mutex_t  testlock = PTHREAD_MUTEX_INITIALIZER

动态初始化:

pthread_mutex_t  testlock;

pthread_mutex_init(&testlock, NULL);

原型:

int pthread_mutex_init(pthread_mutex_t *restrictmutex, constpthread_mutexattr_t *restric
attr);

int pthread_mutex_destroy(pthread_mutex_t *mutex);

返回值: 成功则返回0, 出错则返回错误编号.

说明: 如果使用默认的属性初始化互斥量,
只需把attr设为NULL. 其他值在以后讲解。

2. 互斥操作:

对共享资源的访问,要对互斥量进行加锁,如果互斥量已经上了锁,调用线程会阻塞,直到互斥量被解锁.在完成了对共享资源的访问后,要对互斥量进行解锁。

2.1 加锁函数:

原型:

int pthread_mutex_lock(pthread_mutex_t *mutex);

int pthread_mutex_trylock(pthread_mutex_t *mutex);

返回值: 成功则返回0, 出错则返回错误编号.

说明:

trylock函数:

这个函数是非阻塞调用模式。也就是说,如果互斥量没被锁住,trylock函数将把互斥量加锁,并获得对共享资源的访问权限;如果互斥量被锁住了,
trylock函数将不会阻塞等待而直接返回EBUSY,表示共享资源处于忙状态。

 

3.2解所函数:

原型: int pthread_mutex_unlock(pthread_mutex_t *mutex);

返回值: 成功则返回0, 出错则返回错误编号.

3. 死锁:

死锁主要发生在有多个依赖锁存在时, 会在一个线程试图以与另一个线程相反顺序锁住互斥量时发生. 如何避免死锁是使用互斥量应该格外注意的东西。

防止死锁:

① 对共享资源操作前一定要获得锁。

② 完成操作以后一定要释放锁。

③ 尽量短时间地占用锁。

④ 如果有多锁, 如获得顺序是ABC连环扣, 释放顺序也应该是ABC。

⑤ 线程错误返回时应该释放它所获得的锁。

下面给个测试小程序进一步了解互斥,一个好的例子让你明白很多。

mutex互斥信号量锁住的不是一个变量,而是阻塞住一段程序。如果对一个mutex变量testlock, 执行了第一次pthread_mutex_lock(testlock)之后,在unlock(testlock)之前的这段时间内,如果有其他线程也执行到了pthread_mutex_lock(testlock),这个线程就会阻塞住,直到之前的线程unlock之后才能执行,由此,实现同步,也就达到保护临界区资源的目的。

 

原版程序:

#include<stdio.h>

#include<pthread.h>

static pthread_mutex_t testlock;

pthread_t test_thread;

void *test()

{

       pthread_mutex_lock(&testlock);

       printf("threadTest() \n");

       pthread_mutex_unlock(&testlock);

}

int main()

{

       pthread_mutex_init(&testlock,NULL);

       pthread_mutex_lock(&testlock);

       printf("Mainlock \n");

       pthread_create(&test_thread,NULL, test, NULL);

       sleep(1);//更加明显的观察到是否执行了创建线程的互斥锁

       printf("Main unlock \n");

       pthread_mutex_unlock(&testlock);

       sleep(1);

pthread_join(test_thread,NULL);

       pthread_mutex_destroy(&testlock);

       return 0;

}

make

gcc -D_REENTRANT -lpthread -o test test.c

结果:

Main lock

Main unlock

thread Test()

.

二、条件变量

为什么要先讲互斥锁 再讲 条件变量 呢?

条件变量的使用总是结合互斥锁,为防止竞争。

两个步骤:

a. 线程等待"条件变量的条件成立"而挂起。pthread_cond_wait()

b. 线程使"条件成立"(给出条件成立信号)。pthread_cond_signal()

1.创建和注销  

1.1创建

静态方式:  

pthread_cond_t cond = PTHREAD_COND_INITIALIZER   

动态方式:

pthread_cond_t  cond;

pthread_cond_init(cond, NULL);

 

函数:   

int   pthread_cond_init(pthread_cond_t  *cond,   pthread_condattr_t   *cond_attr)    

尽管POSIX标准中为条件变量定义了属性,但在LinuxThreads中没有实现,因此cond_attr值通常为NULL,且被忽略。 

1.2注销:

只有在没有线程在该条件变量上等待的时候才能注销这个条件变量,否则返回EBUSY。因为Linux实现的条件变量没有分配什么资源,所以注销动作只包括检查是否有等待线程。

函数:  

int   pthread_cond_destroy(pthread_cond_t   *cond)   

2.等待和激发  



       2.1 等待


       int  pthread_cond_wait(pthread_cond_t  *cond,   pthread_mutex_t   *mutex) 

       int  pthread_cond_timedwait(pthread_cond_t   *cond,  pthread_mutex_t   *mutex,   const  struct   timespec   *abstime)   

1)       无条件等待pthread_cond_wait()

2)       计时等待pthread_cond_timedwait()

[计时等待方式:如果在给定时刻前条件没有满足,则返回ETIMEOUT,结束等待。]

无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()(或pthread_cond_timedwait(),下同)的竞争条件(Race   Condition)。

mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。

在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。执行pthread_cond_wait()时自动解锁互斥量(如同执行了
pthread_unlock_mutex),并等待条件变量触发。这时线程挂起,不占用 CPU时间,直到条件变量被触发。

2.2激发

pthread_cond_signal(pthread_cond_t  *cond)

发送信号

注意:要先让pthread_cond_wait进入等待信号状态,才能调用pthread_cond_signal发送信号,才有效,不能让pthread_cond_signal在pthread_cond_wait前面执行。

 

原版程序:

#include<stdio.h>

#include<pthread.h>//多线程所用头文件

#include<semaphore.h> //信号量使用头文件

pthread_cond_tg_cond /*=PTHREAD_MUTEX_INITIALIZER*/; //申明条锁,并用宏进行初始化

pthread_mutex_tg_mutex ;

//线程执行函数

voidthreadFun1(void)

{

int i;

pthread_mutex_lock(&g_mutex); //1

pthread_cond_wait(&g_cond,&g_mutex);//如g_cond无信号,则阻塞

for( i = 0;i < 2; i++ ){

printf("thread threadFun1.\n");

sleep(1);

}

pthread_cond_signal(&g_cond);

pthread_mutex_unlock(&g_mutex);

}

int main(void)

{

pthread_t id1; //线程的标识符

pthread_t id2;

pthread_cond_init(&g_cond,NULL); //也可以程序里面初始化

pthread_mutex_init(&g_mutex,NULL); //互斥变量初始化

int i,ret;

ret = pthread_create(&id1,NULL,(void *)threadFun1, NULL);

if ( ret!=0 ) { //不为0说明线程创建失败

printf ("Create pthread1 error!\n");

exit (1);

}

sleep(5); //等待子线程先开始

pthread_mutex_lock(&g_mutex); //2

//给个开始信号,注意这里要先等子线程进入等待状态在发信号,否则无效

pthread_cond_signal(&g_cond);

pthread_mutex_unlock(&g_mutex);

pthread_join(id1,NULL);

pthread_cond_destroy(&g_cond); //释放

pthread_mutex_destroy(&g_mutex); //释放

return 0;

}

 

我程序:

pthread_cond_t g_cond/*=PTHREAD_MUTEX_INITIALIZER*/; //申明条锁,并用宏进行初始化

pthread_mutex_t g_mutex ;

void Thread1(void)

{

       inti;

       printf( "thread1 : lock\n" );

       pthread_mutex_lock(&g_mutex);

       printf( "thread1 : cond wait\n" );

       pthread_cond_wait(&g_cond,&g_mutex);//如g_cond无信号,则阻塞

 

       for(i = 0;i < 2; i++ )

       {

              printf("thread1: thread1.\n");

              sleep(1);

       }

 

       //printf( "thread1 : cond signal\n" );

       //pthread_cond_signal(&g_cond);

       printf("thread1 : unlock\n" );

       pthread_mutex_unlock(&g_mutex);

 

}

 

int condtest(void)

{

       pthread_tid1; //线程的标识符

       pthread_tid2;

       pthread_cond_init(&g_cond,NULL);//也可以程序里面初始化

       pthread_mutex_init(&g_mutex,NULL);//互斥变量初始化

       inti,ret;

       ret= pthread_create(&id1,NULL,(void *)Thread1, NULL);

       if( ret!=0 )

       {    //不为0说明线程创建失败

              printf ("Create pthread1error!\n");

              exit (1);

       }

       sleep(5);//等待子线程先开始

       printf( "main : lock\n" );

       pthread_mutex_lock(&g_mutex);//

       printf( "main : cond signal\n" );

       pthread_cond_signal(&g_cond);//给个开始信号,注意这里要先等子线程进入等待状态在发信号,否则无效

       printf( "main : unlock\n" );

       pthread_mutex_unlock(&g_mutex);

 

       sleep(3);

       printf( "main : join\n" );

       pthread_join(id1,NULL);

       pthread_cond_destroy(&g_cond);//释放

       pthread_mutex_destroy(&g_mutex);//释放

       return0;

}

/*

试验出真理,这样便于理解。

结果:

thread1 : lock

thread1 : cond wait

main : lock

main : cond signal

main : unlock

thread1: thread1.

thread1: thread1.

thread1 : cond signal

thread1 : unlock

main : join

*/

 

解析:

1) 因为main函数有sleep(5);所有先执行线程threadFun1,执行了加锁,然后进入条件等待(主要条件等待会先进行解锁,然后等待)。

 

2)  当main函数中5秒延时结束后,进行加锁操作,并向threadFun1中的条件等待函数发送信号(此时threadFun1接收到信号,本来要加锁,但是main函数加锁了,所有阻塞在那)。

 

3)  main函数发送信号结束,进行解锁,此时threadFun1就可加锁,便执行threadFun1函数。

当程序进入pthread_cond_wait等待后,将会把g_mutex进行解锁,当离开pthread_cond_wait之前,g_mutex会重新加锁。

 

补充知识:

互斥锁属性

* PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。  互斥锁的属性在创建锁的时候指定,在LinuxThreads实现中仅有一个锁类型属性,不同的锁类型在试图对一个已经被锁定的互斥锁加锁时表现不同。当前(glibc2.2.3,linuxthreads0.9)有四个值可供选择:

* PTHREAD_MUTEX_RECURSIVE_NP,嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。

* PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样就保证当不允许多次加锁时不会出现最简单情况下的死锁。

* PTHREAD_MUTEX_ADAPTIVE_NP,适应锁,动作最简单的锁类型,仅等待解锁后重新竞争。

 

 

接下来这些经典程序我就没去分析了,有空再分析。

现在来看一段典型的应用:看注释即可。

#include <pthread.h>   

#include <unistd.h>   

static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;   

static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;   

struct node {   

int n_number;   

struct node *n_next;   

} *head = NULL;   

/*[thread_func]*/   

static void cleanup_handler(void *arg)   

{   

printf("Cleanup handler of second thread.\n");   

free(arg);   

(void)pthread_mutex_unlock(&mtx);   

}   

static void *thread_func(void *arg)   

{   

struct node *p = NULL;   

pthread_cleanup_push(cleanup_handler, p);   

while (1) {   

pthread_mutex_lock(&mtx);           //这个mutex主要是用来保证pthread_cond_wait的并发性  
while (head == NULL)  {              

//这个while要特别说明一下,单个pthread_cond_wait功能很完善,为何这里要有一个while(head
== NULL)呢?因为pthread_cond_wait里的线程可能会被意外唤醒,如果这个时候head!= NULL,则不是我们想要的情况。这个时候,应该让线程继续进入pthread_cond_wait

pthread_cond_wait(&cond, &mtx);        

// pthread_cond_wait会先解除之前的pthread_mutex_lock锁定的mtx,然后阻塞在等待对列里休眠,直到再次被唤醒(大多数情况下是等待的条件成立而被唤醒,唤醒后,该进程会先锁定先pthread_mutex_lock(&mtx);,再读取资源,用这个流程是比较清楚的/*block-->unlock-->wait()
return-->lock*/

}

}

}

 

 

#include <unistd.h>

#include <stdio.h>

#include <stdlib.h>

#include <sys/errno.h>

#include <sys/types.h>

#include <signal.h>

#include <pthread.h>

#define    min(a,b)    ((a) < (b) ? (a) : (b))

#define    max(a,b)    ((a) > (b) ? (a) : (b))

#define    MAXNITEMS         1000000

#define    MAXNTHREADS            100

int        nitems;            /* read-only by producer andconsumer */

struct {

pthread_mutex_t    mutex;

int    buff[MAXNITEMS];

int    nput;

int    nval;

} shared = { PTHREAD_MUTEX_INITIALIZER };

void    *produce(void *), *consume(void*);

/* include main */

int

main(int argc, char **argv)

{

int            i, nthreads,count[MAXNTHREADS];

pthread_t    tid_produce[MAXNTHREADS],tid_consume;

if (argc != 3) {

printf("usage: prodcons3 <#items> <#threads>\n");

return -1;

}

nitems = min(atoi(argv[1]), MAXNITEMS);

nthreads = min(atoi(argv[2]), MAXNTHREADS);

/* 4create all producers and one consumer */

for (i = 0; i < nthreads; i++) {

count[i] = 0;

pthread_create(&tid_produce[i], NULL, produce, &count[i]);

}

pthread_create(&tid_consume, NULL, consume, NULL);

/* 4wait for all producers and the consumer */

for (i = 0; i < nthreads; i++) {

pthread_join(tid_produce[i], NULL);

printf("count[%d] = %d\n", i, count[i]);   

}

pthread_join(tid_consume, NULL);

exit(0);

}

/* end main */

void *

produce(void *arg)

{

for ( ; ; ) {

pthread_mutex_lock(&shared.mutex);

if (shared.nput >= nitems) {

pthread_mutex_unlock(&shared.mutex);

return(NULL);        /* array is full,we're done */

}

shared.buff[shared.nput] = shared.nval;

shared.nput++;

shared.nval++;

pthread_mutex_unlock(&shared.mutex);

*((int *) arg) += 1;

}

}

/* include consume */

void

consume_wait(int i)

{

for ( ; ; ) {

pthread_mutex_lock(&shared.mutex);

if (i < shared.nput) {

pthread_mutex_unlock(&shared.mutex);

return;            /* an item is ready */

}

pthread_mutex_unlock(&shared.mutex);

}

}

void *

consume(void *arg)

{

int        i;

for (i = 0; i < nitems; i++) {

consume_wait(i);

if (shared.buff[i] != i)

printf("buff[%d] = %d\n", i, shared.buff[i]);

}

return(NULL);

}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: