您的位置:首页 > 编程语言 > C语言/C++

C++11多线程(十六):实战-生产者消费者模型

2016-01-06 15:57 435 查看
参考链接:http://www.cnblogs.com/haippy/p/3252092.html 不错的博客

目录

1.单生产者-单消费者模型

2.单生产者-多消费者模型

3.多生产者-单消费者模型

4.多生产者-多消费者模型

本节阐述经典问题:生产者消费者模型

1.单生产者-单消费者模型

顾名思义,单生产者-单消费者模型中只有一个生产者和一个消费者:
生产者不停地往产品库中放入产品;
消费者则从产品库中取走产品;
产品库容积有限制,只能容纳一定数目的产品;
如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品;
相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。

C++11实现单生产者单消费者模型的代码如下:

<span style="font-size:18px;">#include "stdafx.h"
#include <iostream>
using namespace std;
#include <thread>
#include <vector>
#include <mutex>
#include <atomic> // std::atomic
#include <cstdlib>
#include <condition_variable>

static const int kItemRepositorySize = 10; // Item buffer size.缓冲区大小
static const int kItemsToProduce = 15; // How many items we plan to produce.计划生产产品数量

struct ItemRepository {
int item_buffer[kItemRepositorySize]; // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列.
size_t read_position; // 消费者读取产品位置.
size_t write_position; // 生产者写入产品位置.
std::mutex mtx; // 互斥量,保护产品缓冲区
std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区空置
std::condition_variable repo_not_empty; // 条件变量, 指示产品缓冲区不为空.
} gItemRepository; // 产品库全局变量, 生产者和消费者操作该变量.

typedef struct ItemRepository ItemRepository;

void ProduceItem(ItemRepository *ir, int item)
{
std::unique_lock<std::mutex> lock(ir->mtx);
//while说明:因为缓冲区是循环使用,当生产者即将占用下一个缓冲区,而此时消费者还没有读取该缓冲区,
//需要等待消费者读取完成通知:repo_not_full缓冲区空置
while (((ir->write_position + 1) % kItemRepositorySize)
== ir->read_position)
{ // item buffer is full, just wait here.
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.
}

(ir->item_buffer)[ir->write_position] = item; // 写入产品.
(ir->write_position)++; // 写入位置后移.

if (ir->write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置.
ir->write_position = 0;

(ir->repo_not_empty).notify_all(); // 通知消费者产品库缓冲区不为空.
lock.unlock(); // 解锁.
}

int ConsumeItem(ItemRepository *ir)
{
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
// item buffer is empty, just wait here.
//while说明:因为缓冲区是循环使用,当消费者即将读取下一个缓冲区,此时生产者还未在该缓冲区添加产品,
//等待生产者生产完成通知,repo_not_empty缓冲区已满
while (ir->write_position == ir->read_position)
{
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.
}

data = (ir->item_buffer)[ir->read_position]; // 读取某一产品
(ir->read_position)++; // 读取位置后移

if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位.
ir->read_position = 0;

(ir->repo_not_full).notify_all(); // 通知消费者产品库缓冲区为空
lock.unlock(); // 解锁.

return data; // 返回产品.
}

void ProducerTask() // 生产者任务
{
for (int i = 1; i <= kItemsToProduce; ++i)
{
// sleep(1);
//this_thread::sleep_for(chrono::seconds(1));
std::cout << "Produce the " << i << "^th item..." << std::endl;
ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品.
}
}

void ConsumerTask() // 消费者任务
{
static int cnt = 0;
while (1)
{
this_thread::sleep_for(chrono::seconds(1));
int item = ConsumeItem(&gItemRepository); // 消费一个产品.
std::cout << "Consume the " << item << "^th item" << std::endl;
if (++cnt == kItemsToProduce) break; // 如果产品消费个数为 kItemsToProduce, 则退出.
}
}

void InitItemRepository(ItemRepository *ir)
{
ir->write_position = 0; // 初始化产品写入位置.
ir->read_position = 0; // 初始化产品读取位置.
}

int main()
{
InitItemRepository(&gItemRepository);
std::thread producer(ProducerTask); // 创建生产者线程.
std::thread consumer(ConsumerTask); // 创建消费之线程.
producer.join();
consumer.join();
}</span>输出:
<span style="font-size:18px;">Produce the 1^th item...
Produce the 2^th item...
Produce the 3^th item...
Produce the 4^th item...
Produce the 5^th item...
Produce the 6^th item...
Produce the 7^th item...
Produce the 8^th item...
Produce the 9^th item...
Produce the 10^th item...
Producer is waiting for an empty slot...
Consume the 1^th item
Produce the 11^th item...
Producer is waiting for an empty slot...
Consume the 2^th item
Produce the 12^th item...
Producer is waiting for an empty slot...
Consume the 3^th item
Produce the 13^th item...
Producer is waiting for an empty slot...
Consume the 4^th item
Produce the 14^th item...
Producer is waiting for an empty slot...
Consume the 5^th item
Produce the 15^th item...
Producer is waiting for an empty slot...
Consume the 6^th item
Consume the 7^th item
Consume the 8^th item
Consume the 9^th item
Consume the 10^th item
Consume the 11^th item
Consume the 12^th item
Consume the 13^th item
Consume the 14^th item
Consume the 15^th item
请按任意键继续. . .</span>

2.单生产者-多消费者模型

与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下:
<span style="font-size:18px;">#include "stdafx.h"
#include <iostream>
using namespace std;
#include <thread>
#include <vector>
#include <mutex>
#include <atomic> // std::atomic
#include <cstdlib>
#include <condition_variable>

static const int kItemRepositorySize = 4; // Item buffer size.
static const int kItemsToProduce = 10; // How many items we plan to produce.

struct ItemRepository {
int item_buffer[kItemRepositorySize];
size_t read_position;
size_t write_position;
size_t item_counter;
std::mutex mtx;
std::mutex item_counter_mtx;
std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;
} gItemRepository;

typedef struct ItemRepository ItemRepository;

void ProduceItem(ItemRepository *ir, int item)
{
std::unique_lock<std::mutex> lock(ir->mtx);
while (((ir->write_position + 1) % kItemRepositorySize)
== ir->read_position) { // item buffer is full, just wait here.
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock);
}

4000
(ir->item_buffer)[ir->write_position] = item;
(ir->write_position)++;

if (ir->write_position == kItemRepositorySize)
ir->write_position = 0;

(ir->repo_not_empty).notify_all();
lock.unlock();
}

int ConsumeItem(ItemRepository *ir)
{
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
// item buffer is empty, just wait here.
while (ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock);
}

data = (ir->item_buffer)[ir->read_position];
(ir->read_position)++;

if (ir->read_position >= kItemRepositorySize)
ir->read_position = 0;

(ir->repo_not_full).notify_all();
lock.unlock();

return data;
}

void ProducerTask()
{
for (int i = 1; i <= kItemsToProduce; ++i) {
// sleep(1);
std::cout << "Producer thread " << std::this_thread::get_id()
<< " producing the " << i << "^th item..." << std::endl;
ProduceItem(&gItemRepository, i);
}
std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

void ConsumerTask()
{
bool ready_to_exit = false;
while (1)
{
this_thread::sleep_for(chrono::seconds(1));
std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
if (gItemRepository.item_counter < kItemsToProduce)
{
int item = ConsumeItem(&gItemRepository);
++(gItemRepository.item_counter);
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is consuming the " << item << "^th item" << std::endl;
}
else
ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true)
break;
}
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

void InitItemRepository(ItemRepository *ir)
{
ir->write_position = 0;
ir->read_position = 0;
ir->item_counter = 0;
}

int main()
{
InitItemRepository(&gItemRepository);
std::thread producer(ProducerTask);
std::thread consumer1(ConsumerTask);
std::thread consumer2(ConsumerTask);
std::thread consumer3(ConsumerTask);
std::thread consumer4(ConsumerTask);

producer.join();
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
}</span>输出:
<span style="font-size:18px;">Producer thread 8572 producing the 1^th item...
Producer thread 8572 producing the 2^th item...
Producer thread 8572 producing the 3^th item...
Producer thread 8572 producing the 4^th item...
Producer is waiting for an empty slot...
Consumer thread 7276 is consuming the 1Producer thread 8572 producing the 5^th i
tem...
Producer is waiting for an empty slot...
^th item
Consumer thread 9996 is consuming the 2Producer thread 8572 producing the 6^th i
tem...^th item

Producer is waiting for an empty slot...
Consumer thread 12076 is consuming the 3^th itemProducer thread 8572
producing the 7^th item...
Producer thread 8572 producing the 8^th item...
Producer is waiting for an empty slot...
Consumer thread 10332 is consuming the 4^th item
Consumer thread 7276 is consuming the 5^th item
Producer thread 8572 producing the 9^th item...Consumer thread 9996 is consuming
the 6^th item

Producer thread 8572 producing the 10^th item...
Consumer thread 12076 is consuming the 7^th item
Producer thread 8572 is exiting...
Consumer thread 10332 is consuming the 8^th item
Consumer thread 7276 is consuming the 9^th item
Consumer thread 9996 is consuming the 10^th item
Consumer thread 12076 is exiting...
Consumer thread 10332 is exiting...
Consumer thread 7276 is exiting...
Consumer thread 9996 is exiting...
请按任意键继续. . .
</span>

3.多生产者-单消费者模型

与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器,代码如下:
<span style="font-size:18px;">#include "stdafx.h"
#include <iostream>
using namespace std;
#include <thread>
#include <vector>
#include <mutex>
#include <atomic> // std::atomic
#include <cstdlib>
#include <condition_variable>

static const int kItemRepositorySize = 4; // Item buffer size.
static const int kItemsToProduce = 10; // How many items we plan to produce.

struct ItemRepository {
int item_buffer[kItemRepositorySize];
size_t read_position;
size_t write_position;
size_t item_counter;
std::mutex mtx;
std::mutex item_counter_mtx;
std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;
} gItemRepository;

typedef struct ItemRepository ItemRepository;

void ProduceItem(ItemRepository *ir, int item)
{
std::unique_lock<std::mutex> lock(ir->mtx);
while (((ir->write_position + 1) % kItemRepositorySize)
== ir->read_position) { // item buffer is full, just wait here.
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock);
}

(ir->item_buffer)[ir->write_position] = item;
(ir->write_position)++;

if (ir->write_position == kItemRepositorySize)
ir->write_position = 0;

(ir->repo_not_empty).notify_all();
lock.unlock();
}

int ConsumeItem(ItemRepository *ir)
{
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
// item buffer is empty, just wait here.
while (ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock);
}

data = (ir->item_buffer)[ir->read_position];
(ir->read_position)++;

if (ir->read_position >= kItemRepositorySize)
ir->read_position = 0;

(ir->repo_not_full).notify_all();
lock.unlock();

return data;
}

void ProducerTask()
{
bool ready_to_exit = false;
while (1) {
this_thread::sleep_for(chrono::seconds(1));
std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
if (gItemRepository.item_counter < kItemsToProduce) {
++(gItemRepository.item_counter);
ProduceItem(&gItemRepository, gItemRepository.item_counter);
std::cout << "Producer thread " << std::this_thread::get_id()
<< " is producing the " << gItemRepository.item_counter
<< "^th item" << std::endl;
}
else ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true) break;
}
std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

void ConsumerTask()
{
static int item_consumed = 0;
while (1) {
this_thread::sleep_for(chrono::seconds(1));
++item_consumed;
if (item_consumed <= kItemsToProduce) {
int item = ConsumeItem(&gItemRepository);
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is consuming the " << item << "^th item" << std::endl;
}
else break;
}
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

void InitItemRepository(ItemRepository *ir)
{
ir->write_position = 0;
ir->read_position = 0;
ir->item_counter = 0;
}

int main()
{
InitItemRepository(&gItemRepository);
std::thread producer1(ProducerTask);
std::thread producer2(ProducerTask);
std::thread producer3(ProducerTask);
std::thread producer4(ProducerTask);
std::thread consumer(ConsumerTask);

producer1.join();
producer2.join();
producer3.join();
producer4.join();
consumer.join();
}</span>输出:
<span style="font-size:18px;">Producer thread Consumer thread 9568 is consuming the 1^th item10516 is producin
g the 1
^th item
Producer thread 9948 is producing the 2^th item
Producer thread 9724 is producing the 3^th item
Producer thread 7248 is producing the 4^th item
Producer is waiting for an empty slot...
Producer thread 10516 is producing the 5^th item
Consumer thread 9568 is consuming the 2^th itemProducer is waiting for an empty
slot...

Consumer thread 9568 is consuming the 3^th itemProducer thread 9948 is producing
the 6
^th item
Producer is waiting for an empty slot...
Consumer thread 9568 is consuming the 4^th item
Producer thread 9724 is producing the 7^th item
Producer is waiting for an empty slot...
Consumer thread 9568 is consuming the 5^th item
Producer thread 7248 is producing the 8^th item
Producer is waiting for an empty slot...
Producer thread 10516 is producing the 9Consumer thread 9568 is consuming the 6^
th item^th item

Producer is waiting for an empty slot...
Consumer thread 9568 is consuming the 7^th item
Producer thread 9948 is producing the 10^th item
Producer thread 9724 is exiting...
Producer thread 7248 is exiting...
Producer thread 10516 is exiting...
Consumer thread 9568 is consuming the 8^th item
Producer thread 9948 is exiting...
Consumer thread 9568 is consuming the 9^th item
Consumer thread 9568 is consuming the 10^th item
Consumer thread 9568 is exiting...
请按任意键继续. . .</span>

4.多生产者-多消费者模型

该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。

<span style="font-size:18px;">#include "stdafx.h"
#include <iostream>
using namespace std;
#include <thread>
#include <vector>
#include <mutex>
#include <atomic> // std::atomic
#include <cstdlib>
#include <condition_variable>

static const int kItemRepositorySize = 4; // Item buffer size.
static const int kItemsToProduce = 10; // How many items we plan to produce.

struct ItemRepository {
int item_buffer[kItemRepositorySize];
size_t read_position;
size_t write_position;
size_t produced_item_counter; //生产者计数
size_t consumed_item_counter; //消费者计数
std::mutex mtx;
std::mutex produced_item_counter_mtx; //生产者互斥变量
std::mutex consumed_item_counter_mtx; //消费者互斥变量
std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;
} gItemRepository;

typedef struct ItemRepository ItemRepository;

void ProduceItem(ItemRepository *ir, int item)
{
std::unique_lock<std::mutex> lock(ir->mtx);
while (((ir->write_position + 1) % kItemRepositorySize)
== ir->read_position) { // item buffer is full, just wait here.
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock);
}

(ir->item_buffer)[ir->write_position] = item;
(ir->write_position)++;

if (ir->write_position == kItemRepositorySize)
ir->write_position = 0;

(ir->repo_not_empty).notify_all();
lock.unlock();
}

int ConsumeItem(ItemRepository *ir)
{
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
// item buffer is empty, just wait here.
while (ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock);
}

data = (ir->item_buffer)[ir->read_position];
(ir->read_position)++;

if (ir->read_position >= kItemRepositorySize)
ir->read_position = 0;

(ir->repo_not_full).notify_all();
lock.unlock();

return data;
}

void ProducerTask()
{
bool ready_to_exit = false;
while (1)
{
this_thread::sleep_for(chrono::seconds(1));
//生产者生产产品数量:加锁
std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx);
if (gItemRepository.produced_item_counter < kItemsToProduce) {
++(gItemRepository.produced_item_counter);
ProduceItem(&gItemRepository, gItemRepository.produced_item_counter);
std::cout << "Producer thread " << std::this_thread::get_id()
<< " is producing the " << gItemRepository.produced_item_counter
<< "^th item" << std::endl;
}
else ready_to_exit = true;
//解锁
lock.unlock();
if (ready_to_exit == true) break;
}
std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

void ConsumerTask()
{
bool ready_to_exit = false;
while (1)
{
this_thread::sleep_for(chrono::seconds(1));
//消费者消费产品数量:加锁
std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx);
if (gItemRepository.consumed_item_counter < kItemsToProduce) {
int item = ConsumeItem(&gItemRepository);
++(gItemRepository.consumed_item_counter);
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is consuming the " << item << "^th item" << std::endl;
}
else ready_to_exit = true;
//解锁
lock.unlock();
if (ready_to_exit == true) break;
}
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

void InitItemRepository(ItemRepository *ir)
{
ir->write_position = 0;
ir->read_position = 0;
ir->produced_item_counter = 0;
ir->consumed_item_counter = 0;
}

int main()
{
InitItemRepository(&gItemRepository);
std::thread producer1(ProducerTask);
std::thread producer2(ProducerTask);
std::thread producer3(ProducerTask);
std::thread producer4(ProducerTask);

std::thread consumer1(ConsumerTask);
std::thread consumer2(ConsumerTask);
std::thread consumer3(ConsumerTask);
std::thread consumer4(ConsumerTask);

producer1.join();
producer2.join();
producer3.join();
producer4.join();

consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
}</span>

输出:
<span style="font-size:18px;">Producer thread Consumer thread 1192 is consuming the 1^th item5352 is producing
the 1
^th item
Consumer is waiting for items...
Producer thread 8716 is producing the 2^th itemConsumer thread 11236 is consumin
g the 2
^th item
Consumer thread 8572 is consuming the 3Producer thread 9368 is producing the 3^t
h item
Producer thread 8172 is producing the 4^th item
^th item
Consumer thread 10696 is consuming the 4^th item
Consumer is waiting for items...
Producer thread 5352 is producing the 5^th item
Consumer thread 1192 is consuming the 5^th item
Consumer thread 11236 is consuming the 6^th item
Producer thread 8716 is producing the 6^th item
Consumer is waiting for items...
Producer thread 9368 is producing the 7^th item
Consumer thread 8572 is consuming the 7^th item
Consumer is waiting for items...
Producer thread 8172 is producing the 8Consumer thread 10696 is consuming the 8^
th item
^th item
Producer thread 5352 is producing the 9^th item
Consumer thread 1192 is consuming the 9^th item
Producer thread 8716 is producing the 10^th itemConsumer thread 11236 is consumi
ng the 10^th item

Consumer thread 8572 is exiting...
Consumer thread 10696 is exiting...Producer thread 9368 is exiting...

Producer thread 8172 is exiting...
Producer thread 5352 is exiting...
Consumer thread 1192 is exiting...
Producer thread 8716 is exiting...Consumer thread 11236 is exiting...

请按任意键继续. . .</span>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: