互斥锁与条件变量及生产者-消费者问题
2016-03-17 20:26
519 查看
在多线程或者多进程之间共享数据时,同步是必需的,一个可行的方法是使用互斥锁与条件变量,这是一种最基本的同步形式,下面介绍其用法及在经典同步问题“生产者-消费者问题”中的应用。
例子中,采用单个消费者多个生产者的模式,生产者线程最大数量为MAXTHREADS,生产者生产的数据最大值为MAXITEMS,这两个值都需通过命令行指定,然后取它们的较小值min,g_itemnum全局变量保存了生产者生产的数据上限,局部变量theadnum保存了生产者线程数量。shared结构体为共享数据,buff数组为共享数据的内容,nindex和nvalue分别为buff下一个元素的索引和值。程序开始后,先创建生产者线程并等待生产者线程结束,然后创建消费者线程并等待消费者线程结束。生产者生产数据并保存到buff,其值和数组索引是一样的,并通过参数arg自增1以确定生产的数据个数,因为buff的索引和值是一样的,消费者则检查它们是否相同,不同时给以错误提示。由于没有使用同步机制,运行程序时,偶尔是正确的,偶尔又不是预期结果。
下面是一个错误的结果(生产了5905+4095+2=10002个数据):
下面同样是一个错误的结果:
当然也有正确的时候(概率性):
Posix互斥锁的数据类型为pthread_mutex_t,初始化方式有两种。如果是静态分配的,可以直接初始化为PTHREAD_MUTEX_INITIALIZER;如果是动态分配的如调用malloc,或者是分配在共享内存区中的,需要在运行时调用pthread_mutex_init函数来初始化。
Posix互斥锁的加锁解锁函数有如下三个:
lock是个阻塞函数,trylock非阻塞,当互斥锁已经上锁时,lock阻塞等待解锁,trylock返回EBUSY,unlock则用于解锁。
例子中,在shared结构体中加一个互斥锁并初始化,然后在生产者函数中对共享数据使用这个互斥锁,确保加锁、解锁配对,以防死锁现象发生,这样,程序每次执行后,都是预期结果。
运行结果如下:
条件变量重在某个条件,对这个条件进行操作时要使用互斥锁保护,该锁与pthread_cond_wait函数的mutex参数是同一个锁。wait函数是个阻塞函数,函数内部会原子的释放锁并挂起线程,获得signal时,尝试获取互斥锁后唤醒线程。signal函数发送信号,唤醒等待的线程。
修改上面的生产者-消费者模型,使用条件变量,main函数不用修改。
例子中,生产者生产的数据放到了全局变量g_buff中,shared结构体用于生产者,结构体成员包括一个互斥锁mutex和数组下一个值的索引nindex及值nvalue,ready结构体包括了一个互斥锁mutex和一个条件变量cond以及用于条件变量的测试条件ready,如果条件不成立,即ready值为0,消费者线程就wait等待,直到生产者线程发送signal将其唤醒。生产者发送signal后负责给ready加1,使得测试条件成立,消费者线程在wait被唤醒之后给ready减1,使得测试条件不成立。使用条件变量的生产者-消费者问题与上面使用的轮询的方法结果都是正确的。
1、生产者-消费者问题(一)
生产者-消费者问题是个经典的同步问题,也称为有界缓冲区问题,生产者、消费者代表某个线程或进程,生产者负责生产数据,消费者则对这些数据进行处理,数据是共享的,当有多个生产者或消费者时,同步会显得非常重要,下面例子说明不使用同步会有什么问题。// producer_consumer.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define MAXITEMS (100000) #define MAXTHREADS (10) #define min(a, b) ((a) > (b) ? (b) : (a)) int g_itemnum; struct { int buff[MAXITEMS]; int nindex; // next index in the buff int nvalue; // next value in the buff } shared; // shared struct for multi-thread void* producer(void*); void* consumer(void*); int main(int argc, char **argv) { int i; int threadnum, threadcount[MAXTHREADS]; pthread_t tid_producer[MAXTHREADS], tid_consumer; if (3 != argc) { printf("usage: %s <item_num> <thread_num>\n", argv[0]); } g_itemnum = min(atoi(argv[1]), MAXITEMS); threadnum = min(atoi(argv[2]), MAXTHREADS); printf("item = %d, thread = %d\n", g_itemnum, threadnum); pthread_setconcurrency(threadnum); // start all the producer threads for (i = 0; i < threadnum; ++i) { threadcount[i] = 0; if (0 != pthread_create(&tid_producer[i], NULL, producer, (void*)&threadcount[i])) { printf("pthread_create error producer %d\n", i); exit(EXIT_FAILURE); } printf("producer: thread[%lu] created, threadcount[%d] = %d\n", tid_producer[i], i, threadcount[i]); } // wait for all the producer threads for (i = 0; i < threadnum; ++i) { if (0 != pthread_join(tid_producer[i], NULL)) { printf("pthread_join error producer %d\n", i); exit(EXIT_FAILURE); } printf("producer: thread[%lu] done, threadcount[%d] = %d\n", tid_producer[i], i, threadcount[i]); } // start consumer thread if (0 != pthread_create(&tid_consumer, NULL, consumer, NULL)) { printf("pthread_create error consumer\n"); } printf("consumer: thread[%lu] created\n", tid_consumer); // wait for the consumer thread if (0 != pthread_join(tid_consumer, NULL)) { printf("pthread_join error consumer\n"); } printf("consumer: thread[%lu] done\n", tid_consumer); exit(EXIT_SUCCESS); } void* producer(void *arg) { for (;;) { if (shared.nindex >= g_itemnum) { return NULL; } shared.buff[shared.nindex] = shared.nvalue; shared.nindex++; shared.nvalue++; *((int*)arg) += 1; } return NULL; } void* consumer(void *arg) { int i; for (i = 0; i < g_itemnum; ++i) { if (shared.buff[i] != i) { printf("error: buff[%d] = %d\n", i, shared.buff[i]); } } return NULL; }
例子中,采用单个消费者多个生产者的模式,生产者线程最大数量为MAXTHREADS,生产者生产的数据最大值为MAXITEMS,这两个值都需通过命令行指定,然后取它们的较小值min,g_itemnum全局变量保存了生产者生产的数据上限,局部变量theadnum保存了生产者线程数量。shared结构体为共享数据,buff数组为共享数据的内容,nindex和nvalue分别为buff下一个元素的索引和值。程序开始后,先创建生产者线程并等待生产者线程结束,然后创建消费者线程并等待消费者线程结束。生产者生产数据并保存到buff,其值和数组索引是一样的,并通过参数arg自增1以确定生产的数据个数,因为buff的索引和值是一样的,消费者则检查它们是否相同,不同时给以错误提示。由于没有使用同步机制,运行程序时,偶尔是正确的,偶尔又不是预期结果。
$gcc -o test -lpthread producer_consumer.c $./test 10000 5
下面是一个错误的结果(生产了5905+4095+2=10002个数据):
item = 10000, thread = 5 producer: thread[140116206200576] created, threadcount[0] = 0 producer: thread[140116197807872] created, threadcount[1] = 0 producer: thread[140116189415168] created, threadcount[2] = 0 producer: thread[140116181022464] created, threadcount[3] = 0 producer: thread[140116172629760] created, threadcount[4] = 0 producer: thread[140116206200576] done, threadcount[0] = 5905 producer: thread[140116197807872] done, threadcount[1] = 4095 producer: thread[140116189415168] done, threadcount[2] = 2 producer: thread[140116181022464] done, threadcount[3] = 0 producer: thread[140116172629760] done, threadcount[4] = 0 consumer: thread[140116172629760] created consumer: thread[140116172629760] done
下面同样是一个错误的结果:
item = 10000, thread = 5 producer: thread[140041301595904] created, threadcount[0] = 0 producer: thread[140041293203200] created, threadcount[1] = 0 producer: thread[140041284810496] created, threadcount[2] = 0 producer: thread[140041276417792] created, threadcount[3] = 0 producer: thread[140041268025088] created, threadcount[4] = 0 producer: thread[140041301595904] done, threadcount[0] = 7137 producer: thread[140041293203200] done, threadcount[1] = 833 producer: thread[140041284810496] done, threadcount[2] = 2048 producer: thread[140041276417792] done, threadcount[3] = 158 producer: thread[140041268025088] done, threadcount[4] = 0 consumer: thread[140041268025088] created error: buff[9028] = 9027 error: buff[9144] = 9151 consumer: thread[140041268025088] done
当然也有正确的时候(概率性):
item = 10000, thread = 5 producer: thread[140462046070528] created, threadcount[0] = 0 producer: thread[140462037677824] created, threadcount[1] = 0 producer: thread[140462029285120] created, threadcount[2] = 0 producer: thread[140462020892416] created, threadcount[3] = 0 producer: thread[140462012499712] created, threadcount[4] = 0 producer: thread[140462046070528] done, threadcount[0] = 0 producer: thread[140462037677824] done, threadcount[1] = 0 producer: thread[140462029285120] done, threadcount[2] = 0 producer: thread[140462020892416] done, threadcount[3] = 0 producer: thread[140462012499712] done, threadcount[4] = 10000 consumer: thread[140462012499712] created consumer: thread[140462012499712] done
2、互斥锁
互斥锁mutex即MUTual EXclusion互相排斥,用于保护临界区中的共享数据。在多线程同步中,当一个线程对互斥锁上锁之后,我们说该线程获得了这个互斥锁,其它线程将不能获得这个互斥锁,要么阻塞等待,要么非阻塞出错返回,也就不能执行临界区中的代码了,从而起到了同步效果;当这个互斥锁被释放时,如果有多个线程都处于等待状态,优先级最高的线程将被唤醒以获得该互斥。Posix互斥锁的数据类型为pthread_mutex_t,初始化方式有两种。如果是静态分配的,可以直接初始化为PTHREAD_MUTEX_INITIALIZER;如果是动态分配的如调用malloc,或者是分配在共享内存区中的,需要在运行时调用pthread_mutex_init函数来初始化。
Posix互斥锁的加锁解锁函数有如下三个:
#include <pthread.h> int pthread_mutex_lock(pthread_mutex_t *mptr); int pthread_mutex_trylock(pthread_mutex_t *mptr); int pthread_mutex_unlock(pthread_mutex_t *mptr);
lock是个阻塞函数,trylock非阻塞,当互斥锁已经上锁时,lock阻塞等待解锁,trylock返回EBUSY,unlock则用于解锁。
3、生产者-消费者问题(二)
使用互斥锁修改上面的例子,完成一个正确的生产者-消费者模型。// producer_consumer2.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define MAXITEMS (100000) #define MAXTHREADS (10) #define min(a, b) ((a) > (b) ? (b) : (a)) int g_itemnum; struct { pthread_mutex_t mutex; int buff[MAXITEMS]; int nindex; int nvalue; } shared = { PTHREAD_MUTEX_INITIALIZER }; void* producer(void*); void* consumer(void*); int main(int argc, char **argv) { int i; int threadnum, threadcount[MAXTHREADS]; pthread_t tid_producer[MAXTHREADS], tid_consumer; if (3 != argc) { printf("usage: %s <item_num> <thread_num>\n", argv[0]); } g_itemnum = min(atoi(argv[1]), MAXITEMS); threadnum = min(atoi(argv[2]), MAXTHREADS); printf("item = %d, thread = %d\n", g_itemnum, threadnum); pthread_setconcurrency(threadnum); // start all the producer threads for (i = 0; i < threadnum; ++i) { threadcount[i] = 0; if (0 != pthread_create(&tid_producer[i], NULL, producer, (void*)&threadcount[i])) { printf("pthread_create error producer %d\n", i); exit(EXIT_FAILURE); } printf("producer: thread[%lu] created, threadcount[%d] = %d\n", tid_producer[i], i, threadcount[i]); } // wait for all the producer threads for (i = 0; i < threadnum; ++i) { if (0 != pthread_join(tid_producer[i], NULL)) { printf("pthread_join error producer %d\n", i); exit(EXIT_FAILURE); } printf("producer: thread[%lu] done, threadcount[%d] = %d\n", tid_producer[i], i, threadcount[i]); } // start consumer thread if (0 != pthread_create(&tid_consumer, NULL, consumer, NULL)) { printf("pthread_create error consumer\n"); } printf("consumer: thread[%lu] created\n", tid_consumer); // wait for the consumer thread if (0 != pthread_join(tid_consumer, NULL)) { printf("pthread_join error consumer\n"); } printf("consumer: thread[%lu] done\n", tid_consumer); exit(EXIT_SUCCESS); } void* producer(void *arg) { for (;;) { pthread_mutex_lock(&shared.mutex); if (shared.nindex >= g_itemnum) { pthread_mutex_unlock(&shared.mutex); return NULL; } shared.buff[shared.nindex] = shared.nvalue; shared.nindex++; shared.nvalue++; pthread_mutex_unlock(&shared.mutex); *((int*)arg) += 1; } return NULL; } void* consumer(void *arg) { int i; for (i = 0; i < g_itemnum; ++i) { if (shared.buff[i] != i) { printf("error: buff[%d] = %d\n", i, shared.buff[i]); } } return NULL; }
例子中,在shared结构体中加一个互斥锁并初始化,然后在生产者函数中对共享数据使用这个互斥锁,确保加锁、解锁配对,以防死锁现象发生,这样,程序每次执行后,都是预期结果。
4、生产者-消费者问题(三)
上面的例子中,生产者-消费者模型虽然是正确的,但其消费者函数有不足之处:先创建生产者线程,然后等待其完成后才创建消费者线程,消费者函数是没有问题的,如果不等待生产者线程完成就创建消费者线程呢,这时就有问题了。只要修改消费者函数,添加互斥锁,保证消费者访问的数据确实由生产者生产即可,如果消费者超前生产者,那就要等待一段时间了,这是一个轮询过程,会浪费一定的CPU时间。// producer_consumer3.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define MAXITEMS (100000) #define MAXTHREADS (10) #define min(a, b) ((a) > (b) ? (b) : (a)) int g_itemnum; struct { pthread_mutex_t mutex; int buff[MAXITEMS]; int nindex; int nvalue; } shared = { PTHREAD_MUTEX_INITIALIZER }; void* producer(void*); void* consumer(void*); int main(int argc, char **argv) { int i; int threadnum, threadcount[MAXTHREADS]; pthread_t tid_producer[MAXTHREADS], tid_consumer; if (3 != argc) { printf("usage: %s <item_num> <thread_num>\n", argv[0]); } g_itemnum = min(atoi(argv[1]), MAXITEMS); threadnum = min(atoi(argv[2]), MAXTHREADS); printf("item = %d, thread = %d\n", g_itemnum, threadnum); pthread_setconcurrency(threadnum + 1); // start all the producer threads for (i = 0; i < threadnum; ++i) { threadcount[i] = 0; if (0 != pthread_create(&tid_producer[i], NULL, producer, (void*)&threadcount[i])) { printf("pthread_create error producer %d\n", i); exit(EXIT_FAILURE); } printf("producer: thread[%lu] created, threadcount[%d] = %d\n", tid_producer[i], i, threadcount[i]); } // start consumer thread if (0 != pthread_create(&tid_consumer, NULL, consumer, NULL)) { printf("pthread_create error consumer\n"); } printf("consumer: thread[%lu] created\n", tid_consumer); // wait for all the producer threads for (i = 0; i < threadnum; ++i) { if (0 != pthread_join(tid_producer[i], NULL)) { printf("pthread_join error producer %d\n", i); exit(EXIT_FAILURE); } printf("producer: thread[%lu] done, threadcount[%d] = %d\n", tid_producer[i], i, threadcount[i]); } // wait for the consumer thread if (0 != pthread_join(tid_consumer, NULL)) { printf("pthread_join error consumer\n"); } printf("consumer: thread[%lu] done\n", tid_consumer); exit(EXIT_SUCCESS); } void* producer(void *arg) { for (;;) { pthread_mutex_lock(&shared.mutex); if (shared.nindex >= g_itemnum) { pthread_mutex_unlock(&shared.mutex); return NULL; } shared.buff[shared.nindex] = shared.nvalue; shared.nindex++; shared.nvalue++; pthread_mutex_unlock(&shared.mutex); *((int*)arg) += 1; } return NULL; } void* consumer(void *arg) { int i; for (i = 0; i < g_itemnum; ++i) { // wait for (;;) { pthread_mutex_lock(&shared.mutex); if (i < shared.nindex) { pthread_mutex_unlock(&shared.mutex); return NULL; } pthread_mutex_unlock(&shared.mutex); } if (shared.buff[i] != i) { printf("error: buff[%d] = %d\n", i, shared.buff[i]); } } return NULL; }
运行结果如下:
item = 10000, thread = 5 producer: thread[140551287768832] created, threadcount[0] = 0 producer: thread[140551279376128] created, threadcount[1] = 0 producer: thread[140551270983424] created, threadcount[2] = 0 producer: thread[140551262590720] created, threadcount[3] = 0 producer: thread[140551254198016] created, threadcount[4] = 0 consumer: thread[140551245805312] created producer: thread[140551287768832] done, threadcount[0] = 0 producer: thread[140551279376128] done, threadcount[1] = 6857 producer: thread[140551270983424] done, threadcount[2] = 3027 producer: thread[140551262590720] done, threadcount[3] = 116 producer: thread[140551254198016] done, threadcount[4] = 0 consumer: thread[140551245805312] done
5、使用条件变量
条件变量用于等待,与一个互斥锁相关,其数据类型为pthread_cond_t,初始化与互斥锁类似,下面是两个相关函数:#include <pthread.h> int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); int pthread_cond_signal(pthread_cond_t *cond);
条件变量重在某个条件,对这个条件进行操作时要使用互斥锁保护,该锁与pthread_cond_wait函数的mutex参数是同一个锁。wait函数是个阻塞函数,函数内部会原子的释放锁并挂起线程,获得signal时,尝试获取互斥锁后唤醒线程。signal函数发送信号,唤醒等待的线程。
修改上面的生产者-消费者模型,使用条件变量,main函数不用修改。
// producer_consumer4.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define MAXITEMS (100000) #define MAXTHREADS (10) #define min(a, b) ((a) > (b) ? (b) : (a)) int g_itemnum; int g_buff[MAXITEMS]; struct { pthread_mutex_t mutex; int nindex; // next index in g_buff int nvalue; // next value in g_buff } shared = { PTHREAD_MUTEX_INITIALIZER }; struct { pthread_mutex_t mutex; pthread_cond_t cond; int ready; // if ready for consumer } ready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER }; void* producer(void*); void* consumer(void*); int main(int argc, char **argv) { int i; int threadnum, threadcount[MAXTHREADS]; pthread_t tid_producer[MAXTHREADS], tid_consumer; if (3 != argc) { printf("usage: %s <item_num> <thread_num>\n", argv[0]); } g_itemnum = min(atoi(argv[1]), MAXITEMS); threadnum = min(atoi(argv[2]), MAXTHREADS); printf("item = %d, thread = %d\n", g_itemnum, threadnum); pthread_setconcurrency(threadnum + 1); // start all the producer threads for (i = 0; i < threadnum; ++i) { threadcount[i] = 0; if (0 != pthread_create(&tid_producer[i], NULL, producer, (void*)&threadcount[i])) { printf("pthread_create error producer %d\n", i); exit(EXIT_FAILURE); } printf("producer: thread[%lu] created, threadcount[%d] = %d\n", tid_producer[i], i, threadcount[i]); } // start consumer thread if (0 != pthread_create(&tid_consumer, NULL, consumer, NULL)) { printf("pthread_create error consumer\n"); } printf("consumer: thread[%lu] created\n", tid_consumer); // wait for all the producer threads for (i = 0; i < threadnum; ++i) { if (0 != pthread_join(tid_producer[i], NULL)) { printf("pthread_join error producer %d\n", i); exit(EXIT_FAILURE); } printf("producer: thread[%lu] done, threadcount[%d] = %d\n", tid_producer[i], i, threadcount[i]); } // wait for the consumer thread if (0 != pthread_join(tid_consumer, NULL)) { printf("pthread_join error consumer\n"); } printf("consumer: thread[%lu] done\n", tid_consumer); exit(EXIT_SUCCESS); } void* producer(void *arg) { for (;;) { pthread_mutex_lock(&shared.mutex); if (shared.nindex >= g_itemnum) { pthread_mutex_unlock(&shared.mutex); return NULL; } g_buff[shared.nindex] = shared.nvalue; shared.nindex++; shared.nvalue++; pthread_mutex_unlock(&shared.mutex); // cond pthread_mutex_lock(&ready.mutex); if (0 == ready.ready) { pthread_cond_signal(&ready.cond); } ready.ready++; pthread_mutex_unlock(&ready.mutex); *((int*)arg) += 1; } return NULL; } void* consumer(void *arg) { int i; for (i = 0; i < g_itemnum; ++i) { // cond pthread_mutex_lock(&ready.mutex); while (0 == ready.ready) { pthread_cond_wait(&ready.cond, &ready.mutex); } ready.ready--; pthread_mutex_unlock(&ready.mutex); if (g_buff[i] != i) { printf("error: buff[%d] = %d\n", i, g_buff[i]); } } return NULL; }
例子中,生产者生产的数据放到了全局变量g_buff中,shared结构体用于生产者,结构体成员包括一个互斥锁mutex和数组下一个值的索引nindex及值nvalue,ready结构体包括了一个互斥锁mutex和一个条件变量cond以及用于条件变量的测试条件ready,如果条件不成立,即ready值为0,消费者线程就wait等待,直到生产者线程发送signal将其唤醒。生产者发送signal后负责给ready加1,使得测试条件成立,消费者线程在wait被唤醒之后给ready减1,使得测试条件不成立。使用条件变量的生产者-消费者问题与上面使用的轮询的方法结果都是正确的。
相关文章推荐
- SVM支持向量机
- Unity3D中基本GUI控件介绍
- 第3周项目1 经过几次猜对数字大小
- Linux_开发板通过NFS挂载到虚拟机的几个错误解决
- 记录自己安装Android Studio
- OpenStack二三事(2)
- 第二次上机实验-1
- hdu Jungle Roads(最小生成树)
- Word小技巧之:如何去掉Word文字下面的波浪线
- 2016春季练习——二次筛
- Android Studio学习之签名打包APK
- 通过nfs挂载主机目录到开发板(jz2440)
- HDOJ 1050 Moving Tables
- Eclipse-new-Class文件时Interfaces-Add-Choose interfaces里找不到GenericServlet和HttpServlet
- 夺命雷公狗---DEDECMS----9dedecms单标签
- 有关二叉树的简单实现
- MapReduce on HBase
- 【最全版】Java正则表达式判断手机号码【2014版】
- Python TCP编程入门学习:(1)
- 傲慢与偏见