您的位置:首页 > 理论基础 > 计算机网络

《UNIX网络编程 卷2》 笔记: 互斥锁与条件变量

2017-09-06 21:17 225 查看
从本节开始后续文章都讨论同步,本节主要讨论互斥锁与条件变量。互斥锁与条件变量之前在 《UNIX网络编程 卷1》笔记这一节有介绍,然而那一节并没有讲到临界区的概念。临界区是指被互斥锁保护的一段代码,同一时刻只能有一个线程或进程执行这段代码。我们常说互斥锁保护临界区,实际上是说保护临界区中被多个线程或进程共享的数据。

同步中有一个称为生产者-消费者(producer-consumer)的经典问题。一个或多个生产者(线程或进程)产生一个个数据条目,这些条目由一个或多个消费者(线程或进程)处理。数据条目在生产者和消费者之间通过某种类型的IPC传递。

本节我们讨论多个生产者和单个消费者的问题,对于该问题,书中给出的一个模型如下:


整数数组buff含有被生产者和消费者共享的数据。生产者只是把数据0放入buff[0],把数据1放入buff[1],如此等等,表明生产了数据。消费者只是从数组开始,验证每个数组元素的正确性,表明消费了数据。对于这个模型,生产者-消费者问题有如下两个要求:

    1. 生产者生产出的数据是正确的。也就是说每个数据都必须被放到数组正确的位置。

    2. 消费者验证数据的结果是正确的。也就是说已放到正确位置的数据不能被认为是错误的数据。

为解决该问题,我们实现了第一个程序,代码如下:

#include "unpipc.h"

#define MAXNITEMS 1000000
#define MAXNTHREADS 100

int nitems;

struct {
pthread_mutex_t mutex; /*互斥锁*/
int buff[MAXNITEMS];
int nput; /*buff数组索引*/
int nval; /*放入buff[nput]的值*/
} shared = {
PTHREAD_MUTEX_INITIALIZER
};

void *produce(void *);
void *consume(void *);

int main(int argc, char **argv)
{
int i, nthreadas, count[MAXNTHREADS];
pthread_t tid_produce[MAXNITEMS], tid_consume;

if (argc != 3)
err_quit("usage: prodcons2 <#items> <#threads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreadas = min(atoi(argv[2]), MAXNTHREADS);

Pthread_setconcurrency(nthreadas);

/*创建nthreadas个生产者线程*/
for (i = 0; i < nthreadas; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}

/*等待所有生产者线程完成*/
for (i = 0; i < nthreadas; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}

/*创建消费者线程*/
Pthread_create(&tid_consume, NULL, consume, NULL);
Pthread_join(tid_consume, NULL);

exit(0);
}

/*生产者线程*/
void *produce(void *arg)
{
for ( ; ; ) {
/*修改共享的数据之前必须加锁*/
Pthread_mutex_lock(&shared.mutex);
if (shared.nput >= nitems) {
Pthread_mutex_unlock(&shared.mutex);
return NULL;
}
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
/*数据修改完后解锁*/
Pthread_mutex_unlock(&shared.mutex);
*((int*)arg) += 1;
}
}

/*消费者线程*/
void *consume(void *arg)
{
int i;
for (i = 0; i < nitems; i++) {
if (shared.buff[i] != i)
printf("buff[%d] = %d\n", i, shared.buff[i]);
}
return NULL;
}

共享的数据和变量都放在名为shared的变量中。主程序先创建多个生产者线程,每个线程之间使用互斥锁进行同步,这样可确保生产者生产出的数据的正确性。等所有生产者线程退出后再创建消费者线程,这样可确保验证数据结果的正确性。

该程序还对每个生产者线程生产的数据量进行了统计,其中一次输出的结果如下:

liu@ubuntu:~/work$ ./prodcons2 1000000 4

count[0] = 203549

count[1] = 1253

count[2] = 1310

count[3] = 793888

可看到每个生产者线程生产的数据量不同,多执行几次就会发现该程序每次输出的统计结果都不同。这说明了线程的执行顺序和执行时间的随机性。

上个程序是先创建所有的生产者线程,等生产者线程执行完成后再创建消费者线程,这样比较浪费时间。我们现在作些修改让所有生产者线程启动完成后立即启动消费者线程,这样在生产者线程生产完数据后,消费者线程就立即验证。修改后的代码如下:

#include "unpipc.h"

#define MAXNITEMS 1000000
#define MAXNTHREADS 100

int nitems;

struct {
pthread_mutex_t mutex;
int buff[MAXNITEMS];
int nput; /*生产者生产数据的索引*/
int nval; /*放入buff[nput]的值*/
} shared = {
PTHREAD_MUTEX_INITIALIZER
};

void *produce(void *);
void *consume(void *);

int main(int argc, char **argv)
{
int i, nthreadas, count[MAXNTHREADS];
pthread_t tid_produce[MAXNITEMS], tid_consume;

if (argc != 3)
err_quit("usage: prodcons2 <#items> <#threads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreadas = min(atoi(argv[2]), MAXNTHREADS);

Pthread_setconcurrency(nthreadas);

/*创建nthreadas个生产者线程*/
for (i = 0; i < nthreadas; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
/*创建消费者线程*/
Pthread_create(&tid_consume, NULL, consume, NULL);

/*等待所有线程完成*/
for (i = 0; i < nthreadas; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
Pthread_join(tid_consume, NULL);

exit(0);
}

/*生产者线程*/
void *produce(void *arg)
{
for ( ; ; ) {
Pthread_mutex_lock(&shared.mutex);
if (shared.nput >= nitems) {
Pthread_mutex_unlock(&shared.mutex);
return NULL;
}
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
Pthread_mutex_unlock(&shared.mutex);
*((int*)arg) += 1;
}
}

void consume_wait(int i)
{
/*轮询直到数据生成*/
for ( ; ; ) {
Pthread_mutex_lock(&shared.mutex);
if (i < shared.nput) {
Pthread_mutex_unlock(&shared.mutex);
return;
}
Pthread_mutex_unlock(&shared.mutex);
}
}

/*消费者线程*/
void *consume(void *arg)
{
int i;

for (i = 0; i < nitems; i++) {
consume_wait(i);
/*必须要等生产者线程生成数据后再验证*/
if (shared.buff[i] != i)
printf("buff[%d] = %d\n", i, shared.buff[i]);
}
return NULL;
}
现在,由于生产者线程和消费者线程同时执行,为了保证消费者线程验证数据的正确性,它必须要等待验证的数据被生产出来。于是我们添加了一个consume_wait函数,该函数一直轮询直到数据生成(比较要验证的数据和nput变量)。这种轮询的方式显然很浪费CPU资源,于是我们想到可以使用条件变量。修改后的代码如下:

#include "unpipc.h"

#define MAXNITEMS 1000000
#define MAXNTHREADS 100

int nitems;
int buff[MAXNITEMS];

struct {
pthread_mutex_t mutex;
int nput; /*生产者生产数据的索引*/
int nval; /*放入buff[nput]的值*/
} put = {
PTHREAD_MUTEX_INITIALIZER
};

/*使用条件变量,条件变量都和一个互斥锁一起使用*/
struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int nready;
} nready = {
PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER
};

void *produce(void *);
void *consume(void *);

int main(int argc, char **argv)
{
int i, nthreadas, count[MAXNTHREADS];
pthread_t tid_produce[MAXNITEMS], tid_consume;
int counts = 0;

if (argc != 3)
err_quit("usage: prodcons2 <#items> <#threads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreadas = min(atoi(argv[2]), MAXNTHREADS);

Pthread_setconcurrency(nthreadas);

/*创建nthreads个生产者线程*/
for (i = 0; i < nthreadas; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
/*创建消费者线程*/
Pthread_create(&tid_consume, NULL, consume, NULL);

/*等待所有线程完成*/
for (i = 0; i < nthreadas; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
counts += count[i];
}
Pthread_join(tid_consume, NULL);
printf("count = %d\n", counts);

exit(0);
}

/*生产者线程*/
void *produce(void *arg)
{
for ( ; ; ) {
/*先放入数据*/
Pthread_mutex_lock(&put.mutex);
if (put.nput >= nitems) {
Pthread_mutex_unlock(&put.mutex);
return NULL;
}
buff[put.nput] = put.nval;
put.nput++;
put.nval++;
Pthread_mutex_unlock(&put.mutex);
Pthread_mutex_lock(&nready.mutex);
/*如果nready.nready为0,则唤醒消费者线程*/
if (nready.nready == 0)
Pthread_cond_signal(&nready.cond);
/*nready++表示新生产一个数据*/
nready.nready++;
Pthread_mutex_unlock(&nready.mutex);
*((int*)arg) += 1;
}
}

/*消费者线程*/
void *consume(void *arg)
{
int i;

for (i = 0; i < nitems; i++) {
Pthread_mutex_lock(&nready.mutex);
/*nready.nready为0,表示还未生产出新数据,此时睡眠等待生产者线程生产数据后唤醒*/
while (nready.nready == 0)
Pthread_cond_wait(&nready.cond, &nready.mutex);
/*nready--表示消费了一个数据*/
nready.nready--;
Pthread_mutex_unlock(&nready.mutex);
if (buff[i] != i)
printf("buff[%d] = %d\n", i, buff[i]);
}
return NULL;
}


使用条件变量后,消费者线程在数据没有生成时阻塞,在数据生成后被生产者线程唤醒。这样就不会浪费CPU资源,同时也能保证验证数据的正确性。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: