您的位置:首页 > 运维架构 > Linux

Linux程序设计学习笔记----多线程编程之线程同步条件变量

2014-08-15 13:40 761 查看
转载请注明出处:http://blog.csdn.net/suool/article/details/38582521.

基本概念与原理

互斥锁能够解决资源的互斥访问,但是在某些情况下,互斥并不能解决问题,比如两个线程需 要互斥的处理各自的操作,但是一个线程的操作仅仅存在一种条件成立的情况下执行,一旦错过不可再重现,由于线程间相互争夺cpu资源,因此在条件成立的时候,该线程不一定争夺到cpu而错过,导致永远得不到执行.....

因此需要某个机制来解决此问题,更重要的是,线程仅仅只有一种情况需要执行操作,不停地申请资源导致资源浪费,而Linux提供了条件变量加互斥锁解决该问题.

条件变量变量也是出自POSIX线程标准,另一种线程同步机制,。主要用来等待某个条件的发生。可以用来同步同一进程中的各个线程。当然如果一个条件变量存放在多个进程共享的某个内存区中,那么还可以通过条件变量来进行进程间的同步。

每个条件变量总是和一个互斥量相关联,条件本身是由互斥量保护的,线程在改变条件状态之间必须要锁住互斥量。条件变量相对于互斥量最大的优点在于允许线程以无竞争的方式等待条件的发生。当一个线程获得互斥锁后,发现自己需要等待某个条件变为真,如果是这样,该线程就可以等待在某个条件上,这样就不需要通过轮询的方式来判断添加,大大节省了CPU时间。

在互斥量一文中说过:互斥量是用于上锁,而不是用于等待;现在这句话可以加强为:互斥量是用于上锁,条件变量用于等待

条件变量声明为pthread_cond_t数据类型,在<bits/pthreadtypes.h>中有具体的定义。

基本操作

条件变量初始化和销毁

/* Initialize condition variable  */
int pthread_cond_init (pthread_cond_t *__restrict __cond,   // 指向要初始化的变量的指针
__const pthread_condattr_t *__restrict __cond_attr) ;// 属性

/* Destroy condition variable */
int pt<span style="font-size:12px;">hread_cond_destroy (pthread_cond_t *__cond);</span>
上面两个函数分别由于条件变量的初始化和销毁。

和互斥量的初始化一样,如果条件变量是静态分配的,可以通过常量进行初始化,如下:

<span style="font-size:12px;">pthread_cond_t mlock = PTHREAD_COND_INITIALIZER;</span>
也可以通过pthread_cond_init()进行初始化,对于动态分配的条件变量由于不能直接赋值进行初始化,就只能采用这种方式进行初始化。那么当不在需要使用条件变量时,需要调用pthread_cond_destroy()销毁该条件所占用的资源。

条件变量的属性设置

/* 初始化条件变量属性对象  */
int pthread_condattr_init (pthread_condattr_t *__attr);

/* 销毁条件变量属性对象  */
int pthread_condattr_destroy (pthread_condattr_t *__attr);

/* 获取条件变量属性对象在进程间共享与否的标识  */
int pthread_condattr_getpshared (__const pthread_condattr_t * __restrict __attr,
int *__restrict __pshared);

/* 设置条件变量属性对象,标识在进程间共享与否 */
int pthread_condattr_setpshared (pthread_condattr_t *__attr, int __pshared) ;


这个属性的设置和互斥量属性设置是一样的,具体使用可以参考互斥量的用法:互斥量的属性设置。

条件变量的申请使用

/<span style="font-family:SimSun;font-size:12px;">*  等待条件变为真 */
int pthread_cond_wait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex);

/* 限时等待条件为真 */
int pthread_cond_timedwait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex,
__const struct timespec *__restrict __abstime);

/* 唤醒一个等待条件的线程.  */
int pthread_cond_signal (pthread_cond_t *__cond);

 /* 唤醒等待该条件的所有线程 */
int pthread_cond_broadcast (pthread_cond_t *__cond);</span>
(1)pthread_cond_wait()函数用于等待条件被触发。该函数传入两个参数,一个条件变量一个互斥量,函数将条件变量和互斥量进行关联,互斥量对该条件进行保护,传入的互斥量必须是已经锁住的。调用pthread_cond_wait()函数后,会原子的执行以下两个动作:

将调用线程放到等待条件的线程列表上,即进入睡眠;
对互斥量进行解锁;

由于这两个操作时原子操作,这样就关闭了条件检查和线程进入睡眠等待条件改变这两个操作之间的时间通道,这样就不会错过任何条件的变化。

当pthread_cond_wait()返回后,互斥量会再次被锁住。

(2)pthread_cond_timedwait()函数和pthread_cond_wait()的工作方式相似,只是多了一个等待时间。等待时间的结构为struct timespec,

<span style="font-size:12px;">    struct timespec{
time_t  tv_sec    //Seconds.
long    tv_nsec   //Nanoseconds.
};  </span>


函数要求传入的时间值是一个绝对值,不是相对值,例如,想要等待3分钟,必须先获得当前时间,然后加上3分钟。

要想获得当前系统时间的timespec值,没有直接可调用的函数,需要通过调用gettimeofday函数获取timeval结构,然后转换成timespec结构,转换公式就是:

<span style="font-size:12px;">timeSpec.tv_sec = timeVal.tv_sec;
timeSpec.tv_nsec = timeVal.tv_usec * 1000;</span>

所以要等待3分钟,timespec时间结构的获得应该如下所示:
<span style="font-size:12px;">    struct timeval now;
struct timespec until;
gettimeofday(&now);//获得系统当前时间

//把时间从timeval结构转换成timespec结构
until.tv_sec = now.tv_sec;
until.tv_nsec = now.tv_usec * 1000;

//增加min
until.tv_sec += 3 * 60;  </span>


如果时间到后,条件还没有发生,那么会返回ETIMEDOUT错误。

从pthread_cond_wait()和pthread_cond_timewait()成功返回时,线程需要重新计算条件,因为其他线程可能在运行过程中已经改变条件。

(3)pthread_cond_signal() & pthread_cond_broadcast()

这两个函数都是用于向等待条件的线程发送唤醒信号,pthread_cond_signal()函数只会唤醒等待该条件的某个线程,pthread_cond_broadcast()会广播条件状态的改变,以唤醒等待该条件的所有线程。例如多个线程只读共享资源,这是可以将它们都唤醒。

这里要注意的是:一定要在改变条件状态后,再给线程发送信号。

考虑条件变量信号单播发送和广播发送的一种候选方式是坚持使用广播发送。只有在等待者代码编写确切,只有一个等待者需要唤醒,且唤醒哪个线程无所谓,那么此时为这种情况使用单播,所以其他情况下都必须使用广播发送。

使用示例

下面的程序使用条件变量和互斥锁实现生产者消费者问题.整个临时存储空间为2.因此,如果临时空间已满,则阻塞生产进程,若果没有产品则阻塞消费者线程.

应用程序申请了三个对象:

1)一个互斥锁对象,配合条件变量使用

2)一个空间非空条件变量,消费者线程在空间没有产品时等待这一条件变量;生产者在生成产品后通知此变量

3)一个空间非满条件变量,生成线程在空间满时等待着这一条件变量;消费者线程在消费产品后通知此变量

生产线流程如下:

1.申请互斥锁,如果互斥锁锁定,阻塞等待

2.测试空间是否已满

3.如条件满足(非满),执行操作,完成后解锁互斥锁

4.如果第二步不满足,阻塞,等待非满的条件变量

消费者线程如下:

1.申请互斥锁,如果已锁定,阻塞等待

2.测试空间是否为空

3.满足非空,则执行操作,完成后解锁

4.若第二步不满足,阻塞当前进程,等待空间非空变量.

源代码如下:

/*************************************************************************
> File Name: pthread_cond_exp.c
> Author:SuooL
> Mail:1020935219@qq.com || hu1020935219@gmail.com
> Website:http://blog.csdn.net/suool | http://suool.net > Created Time: 2014年08月15日 星期五 13时27分13秒
> Description:
************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <pthread.h>

#define BUFFER_SIZE 2

/* Circular buffer of integers. */
struct prodcons
{
int buffer[BUFFER_SIZE];      /* the actual data */
pthread_mutex_t lock;         /* mutex ensuring exclusive access to buffer */
int readpos, writepos;        /* positions for reading and writing */
pthread_cond_t notempty;      /* signaled when buffer is not empty */
pthread_cond_t notfull;       /* signaled when buffer is not full */
};

/* Initialize a buffer */
void init(struct prodcons *prod)
{
pthread_mutex_init(&prod->lock,NULL);     // 初始化互斥锁
pthread_cond_init(&prod->notempty,NULL);  // 初始化条件变量
pthread_cond_init(&prod->notfull,NULL);   // ....
prod->readpos = 0;
prod->writepos = 0;                        // 初始化读写操作位置
}
/* Store an integer in the buffer */
void put(struct prodcons * prod, int data)   // 输入产品子函数
{
pthread_mutex_lock(&prod->lock);       // 锁定互斥锁
/* Wait until buffer is not full */
while ((prod->writepos + 1) % BUFFER_SIZE == prod->readpos)  // 测试空间是否满
{
printf("producer wait for not full\n");
pthread_cond_wait(&prod->notfull, &prod->lock);  // 等待空间有空
}
/* Write the data and advance write pointer */
prod->buffer[prod->writepos] = data;     // 写数据
prod->writepos++;                        // 写位置加一
if (prod->writepos >= BUFFER_SIZE)       // 如写到尾部,返回
prod->writepos = 0;
/*Signal that the buffer is now not empty */
pthread_cond_signal(&prod->notempty); // 发送有数据信号
pthread_mutex_unlock(&prod->lock);          // 解锁
}
/* Read and remove an integer from the buffer */
int get(struct prodcons *prod)
{
int data;
pthread_mutex_lock(&prod->lock);       // 锁定
/* Wait until buffer is not empty */
while (prod->writepos == prod->readpos)           // 测试是否有数据
{
printf("consumer wait for not empty\n");
pthread_cond_wait(&prod->notempty, &prod->lock); // 空则等待
}
/* Read the data and advance read pointer */
data = prod->buffer[prod->readpos];     // 读数据
prod->readpos++;                        // 读位置加一
if (prod->readpos >= BUFFER_SIZE)
prod->readpos = 0;          // 读到尾部,返回
/* Signal that the buffer is now not full */
pthread_cond_signal(&prod->notfull);  // 通知非满
pthread_mutex_unlock(&prod->lock);      // 解锁
return data;
}

#define OVER (-1)
struct prodcons buffer;
/*-------------------------------生产者-----------------------*/
void * producer(void * data)
{
int n;
for (n = 0; n < 5; n++)         //生产前五个产品
{
printf("producer sleep 1 second......\n");
sleep(1);      // 每秒产一个
printf("put the %d product\n", n);
put(&buffer, n);
}
for(n=5; n<10; n++)          // 生产后五个
{
printf("producer sleep 3 second......\n");
sleep(3);              // 每三秒产三个
printf("put the %d product\n",n);
put(&buffer,n);
}
put(&buffer, OVER);
printf("producer stopped!\n");
return NULL;
}
/*--------------------------消费者----------------------------*/
void * consumer(void * data)
{
int d=0;
while (1)
{
printf("consumer sleep 2 second......\n");
sleep(2);             // 每2秒消费一个
d=get(&buffer);
printf("get the %d product\n", d);
//		d = get(&buffer);
if (d == OVER ) break;
}
printf("consumer stopped!\n");
return NULL;
}
/*----------------------------生产者--------------------------*/
int main(int argc,char *argv[])
{
pthread_t th_a, th_b;       // 线程定义
void * retval;
init(&buffer);
pthread_create(&th_a, NULL, producer, 0);     // 创建生产线程
pthread_create(&th_b, NULL, consumer, 0);       //  消费线程
/* Wait until producer and consumer finish. */
pthread_join(th_a, &retval);
pthread_join(th_b, &retval);              // 等待线程结束
return 0;
}


结果如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: