您的位置:首页 > 其它

剖析生产者和消费者模型

2017-05-29 08:43 267 查看
  一.什仫是生产者消费者模型?

      什仫是生产者消费者模型呢?我们知道在实际的软件开发过程中,经常会遇到这样的情况:某个模块负责产生数据,这些数据由另外一个模块进行处理(这里的模块是广义的,可以是类,函数,进程和线程等),这里负责产生数据的模块就是生产者,处理数据的模块就是消费者。但是只有生产者和消费者是远远不够的,还需要一个缓冲区介于生产者和消费者之间,作为中介。生产者把数据放入缓冲区,消费者把数据从缓冲区中取走。生产者消费者模型是操作系统中一种重要的模型,它描述的是一种通知等待机制。

      


 
二.生产者和消费者的关系和特点


      对于生产者和消费者模型必须具备的条件,用一句话来说就是:一个交易场所,两种角色,三种关系。

      一个交易场所:交易场所指的是生产者和消费者之间进行数据交换的仓库,这块仓库相当于一个缓冲区,生产者负责把数据放入到缓冲区中,消费者负责把缓冲区中的数据取出来; 

       两种角色:两种角色就是生产者和消费者;

       三种关系:三种关系主要就是生产者与生产者之间,消费者与消费者之间,生产者和消费者之间的关系;

       (1).生产者与生产者之间是互斥关系

       (2).消费者与消费者之间是互斥关系

       (3).生产者与消费者之间是互斥和同步的关系

  三.生产者和消费者模型的特点

       (1).生产者只需要关心的是缓冲区是否为空,为空则说明此时生产者可以往缓冲区中存放数据,不需要关心具体的消费者;

       (2).消费者只需要关心的是缓冲区是否存在数据,若缓冲区存在数据则消费数据即可,而不需要关心具体的生产者;

       (3).生产者生产数据的时候,消费者不能"消费"数据,消费者消费数据的时候生产者不能生产数据,这就相当于一种互斥关系,在任一时间段内只能有一种角色访问缓冲区;

       (4).缓冲区为空时不能进行消费;

       (5).缓冲区为满时不能进行生产;

   四.为什仫要存在缓冲区?

       我们知道生产者消费者模型的"123"原则,那仫为什仫要存在缓冲区呢?为什仫不让生产者直接调用消费者的函数,直接把数据传递过去呢?存在缓冲区有以下几个特点:

      (1).解耦

        假设生产者和消费者是两个类,如果让生产者直接调用消费者的某个函数,那么生产者和消费者之间就会产生依赖(耦合)。如果此时消费者的代码发生变化可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合性也就降低了。

      (2).支持并发

       生产者直接调用消费者的某个方法,还有一个弊端。由于函数调用是同步的(或者称作阻塞的),在消费者的方法没有返回之前,生产者只好一直阻塞,如果消费者处理数据很慢,则生产者就会白白浪费时间。使用生产者和消费者模式之后,生产者和消费者可以是两个独立的并发主体。生产者把制造出来的数据放到缓冲区,就可以继续生产下一个数据,而不必依赖消费者的处理速度了。

      (3).支持忙闲不均

      缓冲区还有一个好处就是,如果制造数据的速度时快时慢,缓冲区的好处就体现出来了,当数据制造快的时候,消费者来不及处理,未处理的数据就可存放到缓冲区中。等待生产者的速度慢下来之后,消费者再慢慢处理。

 
  五.一个简单的关于生产者和消费者的例子


      缓冲区是用带头结点的单链表模拟的,生产者线程往链表中头插结点,消费者线程从链表中头删结点,在代码的处理过程中用到了互斥锁(解决互斥)和条件变量(解决同步)。

      #include<stdio.h>
#include<stdlib.h>
#include<pthread.h>

pthread_mutex_t mylock=PTHREAD_MUTEX_INITIALIZER; //互斥锁
pthread_cond_t mycond=PTHREAD_COND_INITIALIZER; //条件变量--同步

typedef struct node_list //一个交易场所
{
int _data;
struct node_list *_next;
}node,*node_p,**node_pp;

node_p head;

static node_p alloc_node(int data)
{
node_p NewNode=(node_p)malloc(sizeof(node));
if(NewNode == NULL)
{
perror("malloc");
exit(1);
}
NewNode->_next=NULL;
NewNode->_data=data;
return NewNode;
}

static void free_node(node_p del)
{
if(del)
{
free(del);
del=NULL;
}
}

void initList(node_pp h)
{
*h=alloc_node(0);
}

void pushHead(node_p h,int data)
{
node_p NewNode=alloc_node(data);
NewNode->_next=h->_next;
h->_next=NewNode;
}

int IsEmpty(node_p h)
{
return h->_next == NULL ? 1: 0;
}

void popHead(node_p h,int *data)
{
if(!IsEmpty(h))
{
node_p del=h->_next;
h->_next=del->_next;
*data=del->_data;
free_node(del);
}
}

void showList(node_p h)
{
node_p cur=h->_next;
while(cur)
{
printf("%d ",cur->_data);
cur=cur->_next;
}
printf("\n");
}

void DestroyList(node_p h)
{
int data=0;
if(!IsEmpty(h))
{
popHead(h,&data);
}
free_node(h);
}

void *product_run(void *arg) //生产者往链表中头插数据
{
int data=0;
node_p h=(node_p)arg;
while(1)
{
usleep(1000000);
data=rand()%1000;
pthread_mutex_lock(&mylock); //加锁
pushHead(h,data);
pthread_mutex_unlock(&mylock); //解锁
pthread_cond_signal(&mycond); //唤醒等待的线程
printf("product is done...\n");
}
}

void *consumer_run(void *arg) //消费者从链表中头删数据
{
int data=0;
node_p h=(node_p)arg;
while(1)
{
pthread_mutex_lock(&mylock); //加锁
if(IsEmpty(h)) //当交易场所为空的时候,等待并归还所获得的锁
{
pthread_cond_wait(&mycond,&mylock);
}
popHead(h,&data);
pthread_mutex_unlock(&mylock);//解锁
printf("consumer is done,data is %d\n",data);
}
}

int main()
{
initList(&head);
pthread_t product;
pthread_t consumer;
pthread_create(&product,NULL,product_run,(void *)head);
pthread_create(&consumer,NULL,consumer_run,(void *)head);
pthread_join(product,NULL);
pthread_join(consumer,NULL);
DestroyList(head);

pthread_mutex_destroy(&mylock);
pthread_cond_destroy(&mycond);
// int i=0;
// for(;i<10;i++)
// {
// pushHead(head,i);
// showList(head);
// sleep(1);
// }
// int data=0;
// for(;i>5;i--)
// {
// popHead(head,&data);
// showList(head);
// sleep(1);
// }
// DestroyList(head);
return 0;
}

     
点击此处查看源码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息