您的位置:首页 > 理论基础 > 计算机网络

《unix网络编程》生产者消费者问题

2013-08-07 20:35 127 查看
首先是多个生产者单个消费者问题:

/* include main */
#include <semaphore.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#define	NBUFF	 	 10
#define	MAXNTHREADS	100

int		nitems, nproducers;		/* read-only by producer and consumer */

struct {	/* data shared by producers and consumer */
int	buff[NBUFF];
int	nput;
int	nputval;
sem_t	mutex, nempty, nstored;		/* semaphores, not pointers */
} shared;

void	*produce(void *), *consume(void *);

int
main(int argc, char **argv)
{
int		i, count[MAXNTHREADS];
pthread_t	tid_produce[MAXNTHREADS], tid_consume;

if (argc != 3)
printf("usage: prodcons3 <#items> <#producers>");
nitems = atoi(argv[1]);
//nproducers = min(atoi(argv[2]), MAXNTHREADS);
nproducers = atoi(argv[2]);
/* 4initialize three semaphores */
sem_init(&shared.mutex, 0, 1);
sem_init(&shared.nempty, 0, NBUFF);
sem_init(&shared.nstored, 0, 0);

/* 4create all producers and one consumer */
pthread_setconcurrency(nproducers + 1);

pthread_create(&tid_consume, NULL, consume, NULL);
for (i = 0; i < nproducers; i++) {
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}

/* 4wait for all producers and the consumer */
for (i = 0; i < nproducers; i++) {
pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
pthread_join(tid_consume, NULL);

sem_destroy(&shared.mutex);
sem_destroy(&shared.nempty);
sem_destroy(&shared.nstored);
exit(0);
}
/* end main */

/* include produce */
void *
produce(void *arg)
{
for ( ; ; ) {
sem_wait(&shared.nempty);	/* wait for at least 1 empty slot */
sem_wait(&shared.mutex);

if (shared.nput >= nitems) {
sem_post(&shared.nempty);//这句必需,比如说10个卡槽,14个线程每个写入一个卡槽,如果不加这句,其余的4个线程将永远卡在sem_wait(&shared.nempty);这句上
sem_post(&shared.mutex);
return(NULL);			/* all done */
}

shared.buff[shared.nput % NBUFF] = shared.nputval;
printf("put: buff[%d]=%d\n",shared.nput % NBUFF,shared.nputval);
shared.nput++;
shared.nputval++;

sem_post(&shared.mutex);
sem_post(&shared.nstored);	/* 1 more stored item */
*((int *) arg) += 1;
}
}
/* end produce */

/* include consume */
void *
consume(void *arg)
{
int		i;

for (i = 0; i < nitems; i++) {
sem_wait(&shared.nstored);		/* wait for at least 1 stored item */
sem_wait(&shared.mutex);

if (shared.buff[i % NBUFF] != i)
printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
printf("get: buff[%d]=%d\n",i % NBUFF,i);
// else printf("buff[%d] ", i);
sem_post(&shared.mutex);
sem_post(&shared.nempty);		/* 1 more empty slot */
}
return(NULL);
}
/* end consume */


 

多个生产者定量的物品,当生产完所有物品后,后面的生产进程就应该直接return ;

一个消费者就只是从头都尾看顺序的取物品

下来是多生产者多消费者:

/* include globals */
#include <semaphore.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>

#define	NBUFF	 	 10
#define	MAXNTHREADs	100

int		nitems, nproducers, nconsumers;		/* read-only */

struct {	/* data shared by producers and consumers */
int	buff[NBUFF];
int	nput;			/* item number: 0, 1, 2, ... */
int	nputval;		/* value to store in buff[] */
int	nget;			/* item number: 0, 1, 2, ... */
int	ngetval;		/* value fetched from buff[] */
sem_t	mutex, nempty, nstored;		/* semaphores, not pointers */
} shared;

void	*produce(void *), *consume(void *);
/* end globals */

/* include main */
int
main(int argc, char **argv)
{
int		i, prodcount[MAXNTHREADs], conscount[MAXNTHREADs];
pthread_t	tid_produce[MAXNTHREADs], tid_consume[MAXNTHREADs];

if (argc != 4)
printf("usage: prodcons4 <#items> <#producers> <#consumers>");
nitems = atoi(argv[1]);
nproducers = atoi(argv[2]);
nconsumers = atoi(argv[3]);

/* 4initialize three semaphores */
sem_init(&shared.mutex, 0, 1);
sem_init(&shared.nempty, 0, NBUFF);
sem_init(&shared.nstored, 0, 0);

/* 4create all producers and all consumers */
pthread_setconcurrency(nproducers + nconsumers);
for (i = 0; i < nproducers; i++) {
prodcount[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
}
for (i = 0; i < nconsumers; i++) {
conscount[i] = 0;
pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
}

/* 4wait for all producers and all consumers */
for (i = 0; i < nproducers; i++) {
pthread_join(tid_produce[i], NULL);
printf("producer count[%d] = %d\n", i, prodcount[i]);
}
for (i = 0; i < nconsumers; i++) {
pthread_join(tid_consume[i], NULL);
printf("consumer count[%d] = %d\n", i, conscount[i]);
}

sem_destroy(&shared.mutex);
sem_destroy(&shared.nempty);
sem_destroy(&shared.nstored);
exit(0);
}
/* end main */

/* include produce */
void *
produce(void *arg)
{
for ( ; ; ) {
sem_wait(&shared.nempty);	/* wait for at least 1 empty slot */
sem_wait(&shared.mutex);

if (shared.nput >= nitems) {
sem_post(&shared.nstored);	/* let consumers terminate */
sem_post(&shared.nempty);
sem_post(&shared.mutex);
return(NULL);			/* all done */
}

shared.buff[shared.nput % NBUFF] = shared.nputval;
printf("put: buff[%d]=%d\n",shared.nput % NBUFF,shared.nputval);
shared.nput++;
shared.nputval++;

sem_post(&shared.mutex);
sem_post(&shared.nstored);	/* 1 more stored item */
*((int *) arg) += 1;
}
}
/* end produce */

/* include consume */
void *
consume(void *arg)
{
int		i;

for ( ; ; ) {
sem_wait(&shared.nstored);	/* wait for at least 1 stored item */
sem_wait(&shared.mutex);

if (shared.nget >= nitems) {
sem_post(&shared.nstored);
sem_post(&shared.mutex);
return(NULL);			/* all done */
}

i = shared.nget % NBUFF;
if (shared.buff[i] != shared.ngetval)
printf("error: buff[%d] = %d\n", i, shared.buff[i]);
printf("get: buff[%d]=%d\n",i % NBUFF,i);
shared.nget++;
shared.ngetval++;

sem_post(&shared.mutex);
sem_post(&shared.nempty);	/* 1 more empty slot */
*((int *) arg) += 1;
}
}
/* end consume */

需要注意的就是必须有80行:sem_post(&shared.nstored);

当生产者生产完后,让每个生产者线程给nstored加1以给各个消费者线程接阻塞。不然消费者进程将永远阻塞在sem_wait(&shared.nstored)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: