您的位置:首页 > 运维架构 > Linux

Linux _条件变量

2016-04-02 04:22 423 查看
条件变量

1. 问题

某些情况下,某些线程有这个需求:

仅当满足某个特定条件时,才执行对应操作;

如果该条件不满足,就阻塞该线程,一直等到对应的条件满足才继续执行。

解决方案:
当条件满足时,使用信号量唤醒对应线程,
当条件不满足时,使用信号量阻塞对应线程。
并用互斥量(互斥锁)来保护对该条件的访问。

Linux提供了一种更方便的机制来解决该类问题:条件变量


使用方法

条件变量是一种特殊的“通知”,而不是指某个条件。

当特定的条件满足时,就使用pthread_cond_signal发送该通知(即发送该条件)

即通知等待该条件的线程,它所等待的条件已经满足了。

当特定的条件还不满足时,就使用pthread_cond_wait来等待该通知(即等待该条件)

条件变量和互斥量结合使用:

(1) 等待”通知”的pthread_cond_wait和发送“通知”的pthread_cond_signal,

这两个调用的内部实现,需要使用互斥量,用来保护该条件变量。

(2) 用来判断条件是否满足的相关共享资源,也需要用该互斥量进行保护。


条件变量的使用接口

1) 条件变量的表示

类型:pthread_cond_t

2) 条件标量的初始化

编译时初始化:

pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;

运行时初始化:

pthread_cond_t my_cond;

pthread_cond_init(&my_cond, NULL);

/* 参数2为NULL, 表示该条件变量使用默认属性 */

3) 等待条件标量

pthread_cond_wait

原型:int pthread_cond_wait (pthread_cond_t *cond,

pthread_mutex_t *mutex);

参数:cond, 条件变量

mutex, 该条件变量所使用的互斥量

pthread_cond_timedwait
原型: int  pthread_cond_timedwait(pthread_cond_t *cond,
pthread_mutex_t *mutex,
const struct timespec *abstime);
功能:pthread_cond_wait的限时等待版本
参数:abstime, 是一个绝对时间,即当前时间+超时时间

注意:当因等待该条件变量而使该线程阻塞时,隐含了一个动作(对该互斥量进行解锁)
当被唤醒时,即从该调用返回时,又隐含了一个动作(对该互斥量进行加锁)


4) “发送”该条件变量

即,通知(唤醒)等待该条件变量的线程。

如果没有线程在等待该条件变量,则忽视该操作,无累积效应。(而多次执行V操作,将有“累积效应”)

pthread_cond_signal
原型: int  pthread_cond_signal(pthread_cond_t  *cond);
功能: 如果有多个线程都在等待该条件变量,
则,使用调度策略唤醒一个线程,其余线程继续等待。

pthread_cond_broadcast
原型:int  pthread_cond_broadcast(pthread_cond_t *cond);
功能:唤醒等待该条件变量的所有线程。


实例

生产者线程、消费者线程

最多可以同时存放BUFF_SIZE个“产品”

每个产品用一个整数表示。

即使用int buff[BUFF_SIZE]存放所有产品。

当buff放满时,不可以再生产。

当buff为空时,不可以再消费

分别调整生产者和消费者的速度,观察输出信息。

main6.c

4.

创建两个线程5

线程1接收用户输入

接收完成后,由线程2对该字符串进行“加工”,即统计其长度,并打印输出。

同步要求:
接收到用户输入后,才能统计字符串长度。
用户统计完成后,才能继续接收用户输入。


main7.c

------------------------------
lock
if (判断是否需要等待)
pthread_cond_wait(&cond, &lock)

work
unlock
-------------------------------
pthread_cond_signal(&cond);
_______________________________
thread1:
如果COUNT > 0 就执行work()
否则,等待直到该条件满足

pthread_mutex_lock(&lock);
if (!(COUNT > 0)) {
pthread_cond_wait(&conditon, &lock);
}
pthread_mutex_unlock(&lock);
work();

线程2:

//
COUNT++;

pthread_mutex_lock(&lock);
if(COUNT > 0) {
pthread_cond_signal(&condition)
}


main6.c

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>

#define BUFF_SIZE  3

int buff[BUFF_SIZE];
int pos_product;
int pos_consume;

// ∂®“ÂÃıº˛±‰¡øcond_product
pthread_cond_t cond_product;

// ∂®“ÂÃıº˛±‰¡øcond_consume
pthread_cond_t cond_consume;

// ∂®“Âœfl≥ê•≥‚À¯lock
pthread_mutex_t lock;

void init_work(void)
{
// Ãıº˛±‰¡øµƒ≥ı ºªØ
pthread_cond_init(&cond_product, NULL);
pthread_cond_init(&cond_consume, NULL);

// œfl≥ÃÀ¯µƒ≥ı ºªØ
pthread_mutex_init(&lock, NULL);

pos_product = 0;
pos_consume = 0;
}

void* handle_product(void *arg)
{
int i;

for(i=1; i<5; i++) {
pthread_mutex_lock(&lock);
if ((pos_product + 1) % BUFF_SIZE
== pos_consume) {
// ≤÷ø‚“—¬˙£¨”¶∏√◊Ë»˚ Ωµ»¥˝
printf("Buff is full, wait...\n");
pthread_cond_wait(&cond_product, &lock);       //µ±Ãıº˛±‰¡øŒ¥∑¢…˙∏ƒ±‰ ± £¨ Ω¯––µΩ’‚“ª≤Ωª·◊‘∂ØΩ‚À¯  £¨µ±Ãıº˛±‰¡ø¬˙◊„ «“
// À¯◊¥Ã¨Œ™Œ¥º”À¯◊¥Ã¨£¨◊‘∂غœÀ¯÷¥––œ¬“ª≤Ω
// »Ù À¯◊¥Ã¨Œ™ºœÀ¯◊¥Ã¨  £¨µ»¥˝∆‰À¯◊¥Ã¨Œ™Œ¥º”À¯  º”À¯∫Û÷¥–– œ¬“ª≤Ω
}
buff[pos_product] = i;
printf("Product a productor(%d)\n", i);

pos_product++;
if (pos_product >= BUFF_SIZE) {
pos_product = 0;
}

//pos_product = (pos_product+1)%BUFF_SIZE;

pthread_cond_signal(&cond_consume);
pthread_mutex_unlock(&lock);

printf("product sleep begin.\n");
sleep(1);
printf("product sleep end.\n");
}
}

void* handle_consume(void *arg)
{
int val;
int i;
for (i=1; i<5; i++) {
pthread_mutex_lock(&lock);
if (pos_consume == pos_product) {
/* ◊Ë»˚ Ωµ»¥˝ */
printf("Buff is empty, waiting...\n");
pthread_cond_wait(&cond_consume, &lock);
}

/* ¥”≤÷ø‚»°≥ˆ≤˙∆∑ */
val = buff[pos_consume];
printf("Consume a product. val = %d\n", val);

/* –fi∏ƒø…œ˚∑—≤˙∆∑µƒŒª÷√ */
pos_consume++;
if (pos_consume >= BUFF_SIZE) {
pos_consume = 0;
}

pthread_cond_signal(&cond_product);
pthread_mutex_unlock(&lock);

printf("consumer sleep...begin\n");
sleep(3);
printf("consumer sleep...end\n");
}
}

int main(void)
{
pthread_t th_product, th_consume;
int ret;

init_work();

ret = pthread_create(&th_product, 0, handle_product, 0);
if (ret != 0) {
perror("create thread failed!\n");
exit(1);
}

ret = pthread_create(&th_consume, 0, handle_consume, 0);
if (ret != 0) {
perror("create thread failed!\n");
exit(1);
}

pthread_join(th_product, 0);
pthread_join(th_consume, 0);

return 0;
}


main7.c

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <string.h>

#define BUFF_SIZE  80
char buff[BUFF_SIZE];
pthread_cond_t  cond_input;
pthread_cond_t  cond_work;
pthread_mutex_t lock;

void *hanle_input(void *arg)
{
int ret;
int fd;

fd = 0;
while(1) {
fd_set read_set;
FD_ZERO(&read_set);
FD_SET(fd, &read_set);

ret = select(fd+1, &read_set, 0, 0, 0);
if (ret == -1) {
perror("select failed!\n");
exit(1);
} else {
if (FD_ISSET(fd, &read_set)) {
pthread_mutex_lock(&lock);
if (buff[0] != '\0') {
pthread_cond_wait(&cond_input, &lock);
}
bzero(buff, sizeof(buff));
ret = read(fd, buff, sizeof(buff));
if (ret == -1) {
printf("read failed!\n");
exit(1);
}

pthread_cond_signal(&cond_work);
pthread_mutex_unlock(&lock);
}
}
}
}

void *hanle_work(void *arg)
{
while(1) {
pthread_mutex_lock(&lock);
if (buff[0] == '\0') {
pthread_cond_wait(&cond_work, &lock);
}

printf("You input %d characters\n", strlen(buff));
buff[0] = '\0';

pthread_cond_signal(&cond_input);
pthread_mutex_unlock(&lock);
}
}

static void init_work(void)
{
pthread_cond_init(&cond_input, NULL);
pthread_cond_init(&cond_work, NULL);
pthread_mutex_init(&lock, NULL);
}

int main(void)
{
int ret;
pthread_t th_input;
pthread_t th_work;

init_work();

ret = pthread_create(&th_input, 0, hanle_input, 0);
if (ret != 0) {
printf("create thread failed!\n");
}

ret = pthread_create(&th_work, 0, hanle_work, 0);
if (ret != 0) {
printf("create thread failed!\n");
}

pthread_join(th_input, 0);
pthread_join(th_work, 0);

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: