您的位置:首页 > 运维架构 > Linux

Linux多线程编程(三)-----生产者与消费者(条件变量,信号量)

2017-02-22 21:02 381 查看
Linux多线程编程(一):http://blog.csdn.net/llzk_/article/details/55670172

Linux多线程编程(二):http://blog.csdn.net/llzk_/article/details/55805851

在前两篇文章中我们探讨了线程的概念,同步与互斥概念以及互斥锁。本文将剖析一种操作系统中重要的模型—–生产者与消费者。其中会涉及条件变量,互斥锁,信号量等概念。

概念

我认为,生产者消费者模型主要是用来解决资源的供求问题。生产者是资源的提供方,消费者是资源的需求方。此为两种角色

生产者与消费者之间是供求关系

两个生产者之间是竞争关系(互斥)

两个消费者之间也是竞争关系(互斥)

此为三种关系

生产者与消费者进行交易,那必然需要一个交易场所。在现实生活中可以是饭店,商场等等。此为一个场所

记住这“3 2 1”规则,会为你理解这个模型提供很大的帮助。

下面我们举一个例子来进一步理解一下生产者与消费者间的关系。假定现在有一个商场。生产厂商为这个商场提供,我们去商场买商品。所以生产厂商是生产者,我们是消费者,商场是交易场所,商品就是连接关系的资源。

当生产厂商为商场供货时,因为一些黑幕交易,我们此时是不能进商场消费的。

当我们在商场消费时,厂商因为没有“安全”的环境,是不来供货的。

生产者(厂商)与消费者(我们)之间是互斥关系。不能同时访问交易场所。

但是也需要一种规则,来协同厂商与我们访问商场的时间。所以生产者(厂商)与消费者(我们)之间也是同步关系。(同步指协同)

条件变量

在理解了上面的同步与互斥之后,我们还要解决一个问题。假如你大老远跑来商场买东西,发现货架是空的,生产者并没有提供任何商品给你买。那你会不会很失望?

为了避免这种事情的发生,我们就需要条件变量来帮助我们。当有商品时,会通知你“有辣条了,快来买吧!”。当没有商品时,就一直等待。

定义

条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待“条件变量的条件成立”;另一个线程使“条件成立”(给出条件成信号)。所以条件变量使我们可以睡眠等待某种条件出现。

为了防止竞争,条件变量的使用总是和一个互斥锁进行搭配。

函数接口

以下所有函数接口的头文件都是

#include<pthread.h>


条件变量的数据类型

pthread_cond_t


条件变量初始化

条件变量初始化有两种方法:

①直接定义一个全局的条件变量,并利用宏PTHREAD_COND_INITIALIZER进行值得初始化。

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;


②调用函数pthread_cond_init

pthread_cond_t (pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);


第一个参数即为我们动态分配的条件变量cond,除非创建一个非默认属性的条件变量,否则第二个参数attr始终为NULL;

注意:若不想讲条件变量定义成全局的,必须以动态分配的方式创建。

pthread_cond_t *cond = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));


注意:使用此种方式,先destroy(看下文)条件变量,再free这块空间。

销毁条件变量

我们有创建条件变量,那必须得有销毁。调用pthread_cond_destroy函数销毁条件变量。

int pthread_cond_destroy(pthread_cond_t* cond);


参数cond指针即指向我们创建的条件变量。

等待

我们使用pthread_cond_wait或pthread_cond_timewait函数等待条件变量变为真。

int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);


我们先说pthread _cond_wait,其第一个参数是指向条件变量的指针,第二个参数是一个指向互斥锁的指针。在上面提到过,条件变量总是和一把互斥锁协同使用的。目的是为了防止资源的竞争。这里的资源暂且理解为商场。(关于互斥锁不懂的可参照文章开头第二个链接)

生产者与消费者之间是互斥关系。不能同时访问交易场所,所以我们加一把锁来协同两者。

我们不难猜出这个锁mutex在这里的含义,假设此时有两个消费者A,B都在等待资源(等待条件变量变为真)。此时生产者申请到了”锁”,并且生产了一个“商品”。紧接着释放了锁资源,并发送信号告诉消费者:“你们可以来买了”。

消费者A率先抢到了锁,并且买走了这个“商品”。之后消费者B申请到了锁,进入商场发现已经没有商品了,此时条件变量为假。消费者B会一直等到条件变量为真。但是此时锁在B身上,如果B不释放锁,并且一直等待的话,生产者因为申请不到锁而无法生产“商品”,消费者B因为没有商品而一直等待。就会造成“死锁”问题。说了这么多,wait函数的第二个参数就是为了当消费者在申请到锁,但是条件变量为假(没有锁资源)的情况下及时的释放锁资源

wait函数是无条件等待。在条件变量为假时,会一直等下去。timedwait是有条件等待,它多定义了一个超时,超时值定义了我们愿意等待多长时间。它通过timespec决定。

发送信号

int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);


当生产者生产完毕后需要通知消费者。发送信号有两种方式。signal是根据某种优先级唤醒一个等待者。broadcast是在资源充足的情况下进行广播,唤醒所有等待者。

条件变量与互斥锁搭配示例代码

因为代码引用了pthread库中的函数,所以编译时要引入库pthread。如:

gcc -o test test.c -lpthread


代码:

首先我写了一个链表作为交易场所。

typedef struct ListNode
{
int data;
struct ListNode* _next;
}Node;

Node* creatNode(int data)
{
Node* newNode = (Node*)malloc(sizeof(Node));
if(newNode == NULL)
{
perror("malloc error\n");
return NULL;
}
newNode->data = data;
newNode->_next = NULL;
return newNode;
}

void InitList(Node** head)
{
*head = creatNode(0);
}

void Push_head(Node* head,int data)
{
assert(head);
Node* newNode = creatNode(data);
newNode->_next = head->_next;
head->_next = newNode;
return;
}

int IsEmpty(Node* head)
{
assert(head);
if(head->_next == NULL)
{
return 0;
}
else
{
return 1;
}
}

void DelNode(Node* del)
{
assert(del);
free(del);
del = NULL;
return;
}

void Pop_head(Node* head,int *data)
{
assert(head);
if(IsEmpty(head) != 0)
{
Node* del = head->_next;
head->_next = del->_next;
*data = del->data;
DelNode(del);
return;
}
printf("List if Empty!\n");
return;
}

void DestroyList(Node* head)
{
assert(head);
Node* cur = head->_next;
while(cur)
{
Pop_head(head,NULL);
cur = cur->_next;
}
return;
}

void ShowList(Node* head)
{
assert(head);
Node* cur = head->_next;
while(cur)
{
printf("%d ",cur->data);
cur = cur->_next;
}
printf("\n");
return;
}


下面是主函数和生产者消费者代码:

//创建互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//创建条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void* thread_producer(void* arg)
{
Node* head = (Node*)arg;
while(1)
{
int data = rand()%100000;
pthread_mutex_lock(&mutex);
usleep(10000);
//sleep(1);
Push_head(head,data);
pthread_mutex_unlock(&mutex);
printf("Producer sell : %d\n",data);
pthread_cond_signal(&cond);
}
return NULL;
}
void* thread_consumer(void *arg)
{
Node* head = (Node*)arg;
while(1)
{
int data = 0;
pthread_mutex_lock(&mutex);
//sleep(1);
while(IsEmpty(head) == 0)
{
pthread_cond_wait(&cond,&mutex);
}
Pop_head(head,&data);
pthread_mutex_unlock(&mutex);
printf("Consumer get : %d\n",data);
}
return NULL;
}

int main()
{

Node* head;
InitList(&head);
pthread_t tid1,tid2;
//生产者线程
pthread_create(&tid1,NULL,thread_producer,(void*)head);
//消费者线程
pthread_create(&tid2,NULL,thread_consumer,(void*)head);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
//销毁互斥锁
pthread_mutex_destroy(&mutex);
//销毁条件变量
pthread_cond_destroy(&cond);
return 0;
}


信号量(多元)

除了上面那种方式外我们还可以用信号量实现生产者消费者之间的同步。我们将上面的商场模型再具体化一下,假如,商场的货架只有五个摆放存放商品的格子。每个格子只能放一件商品。这五个格子可以循环利用。可以把其想成一个环形队列。



当有空格子时,生产者就可以生产商品。并且要按顺序访问格子1,2,3,4,5。

当有商品时,消费者就能消费。也是按顺序访问。

生产者与消费者在环形队列下需要遵循以下规则:生产者优先,消费者的永远不能追上生产者,生产者也不能超过消费者一圈。(可以想象成两个人赛跑)

可以看出,生产者关注的是格子资源,消费者关注的是商品资源。我们就可以定义两个信号量来分别代表这两个资源。

函数接口

信号量的数据类型为sem_t,函数接口的头文件为semaphore.h。

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);


初始化信号量sem_init,参数value为信号量的值,参数pshared一般设为0,表示信号量用于同一进程内线程间同步。摧毁信号量sem_destroy。P操作(申请资源)sem_wait,使信号量的值-1。V操作(释放资源)sem_post,使信号量的值+1。sem_trywait是尝试申请资源。

使用信号量实现生—消模型代码

因为代码引用了pthread库中的函数,所以编译时要引入库pthread。如:

gcc -o test test.c -lpthread


代码:

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<semaphore.h>

#define _SIZE_ 64

int ringbuf[_SIZE_];//环形队列

sem_t blanks;//表示格子的信号量
sem_t datas;//表示商品的信号量

//生产者
void* thread_producer(void* arg)
{
int i = 0;
while(1)
{
sleep(1);
sem_wait(&blanks);//生产者申请格子资源
int data = rand()%10000;
ringbuf[i] = data;
printf("Producer sell : %d\n",data);
i++;
i %=_SIZE_;
sem_post(&datas);//每生产一个商品就需要对商品信号量+1
}
}

//消费者
void* thread_consumer(void* arg)
{
int i = 0;
while(1)
{
sleep(1);
sem_wait(&datas); //消费者申请商品资源
printf("Consumer get : %d\n",ringbuf[i]);
i++;
i %= _SIZE_;
sem_post(&blanks);//每买走一个商品,就多了一个格子资源。
}
}

int main()
{

//将格子信号量初始化为64,将商品信号量初始化为0,以保证生产者优先。
sem_init(&blanks,0,_SIZE_);
sem_init(&datas,0,0);

pthread_t producer;
pthread_t consumer;
pthread_create(&producer,NULL,thread_producer,NULL);
pthread_create(&consumer,NULL,thread_consumer,NULL);
pthread_join(producer,NULL);
pthread_join(consumer,NULL);

sem_destroy(&blanks);
sem_destroy(&datas);
return 0;
}


思考

如果有多个消费者多个生产者又该怎么处理呢?

下面直接给出代码:

#include<stdio.h>
#include<stdlib.h>
#include<semaphore.h>
#include<pthread.h>

#define _SIZE_ 64

int ringbuf[_SIZE_];

pthread_mutex_t lock1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lock2 = PTHREAD_MUTEX_INITIALIZER;

sem_t blanks;
sem_t datas;

void* thread_pro(void* arg)
{
while(1)
{
sleep(1);
sem_wait(&blanks);
pthread_mutex_lock(&lock1);
int data = rand()%10000;
int i = 0;
ringbuf[i] = data;
i++;
i %=_SIZE_;
pthread_mutex_unlock(&lock1);
sem_post(&datas);
printf("pro sell a number: %d\n",data);
}
}

void* thread_con(void* arg)
{
while(1)
{
sleep(1);
sem_wait(&datas);
pthread_mutex_lock(&lock2);
int i = 0;
pthread_mutex_unlock(&lock2);
sem_post(&blanks);
printf("con get a number: %d\n",ringbuf[i]);
i++;
i %=_SIZE_;
}
}

int main()
{
//index = 0;
//num = 0;
sem_init(&blanks,0,_SIZE_);
sem_init(&datas,0,0);

pthread_t pro1,pro2;
pthread_t con1,con2;
pthread_create(&pro1,NULL,thread_pro,NULL);
pthread_create(&pro2,NULL,thread_pro,NULL);
pthread_create(&con1,NULL,thread_con,NULL);
pthread_create(&con2,NULL,thread_con,NULL);
pthread_join(pro1,NULL);
pthread_join(pro2,NULL);
pthread_join(con1,NULL);
pthread_join(con2,NULL);

sem_destroy(&blanks);
sem_destroy(&datas);

pthread_mutex_destroy(&lock1);
pthread_mutex_destroy(&lock2);
//printf("hello\n");
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息