windows 使用关键段和条件变量实现的生产者和消费者线程同步
2016-12-06 10:52
316 查看
关键段比较简单,调用函数如下:
VOID InitializeCriticalSection( LPCRITICAL_SECTION
lpCriticalSection );//初始化一个关键代码段
VOID EnterCriticalSection( LPCRITICAL_SECTION
lpCriticalSection);//获取关键代码段的访问权
VOID LeaveCriticalSection( LPCRITICAL_SECTION
lpCriticalSection);//释放关键代码段的访问权
VOID DeleteCriticalSection( LPCRITICAL_SECTION
lpCriticalSection);//删除关键代码段
条件变量的介绍:
Condition variables are synchronization primitives that enable threads to wait until a particular condition occurs. Condition variables are user-mode objects that cannot be shared across processes.
Condition variables enable threads to atomically release a lock and enter the sleeping state. They can be used with critical sections or slim reader/writer (SRW) locks. Condition variables support operations that "wake one" or "wake all" waiting threads.
After a thread is woken, it re-acquires the lock it released when the thread entered the sleeping state.
Windows Server 2003 and Windows XP: Condition variables are not supported.
The following are the condition variable functions.
The following pseudocode demonstrates the typical usage pattern of condition variables.
The following code implements a producer/consumer queue. The queue is represented as a bounded circular buffer, and is protected by a critical section. The code uses two condition variables: one used by producers (
by consumers (
The code calls the
InitializeConditionVariable function to create the condition variables. The consumer threads call theSleepConditionVariableCS
function to wait for items to be added to the queue and theWakeConditionVariable function to signal the producer
that it is ready for more items. The producer threads callSleepConditionVariableCS to wait for the consumer to remove items from the queue andWakeConditionVariable to signal the consumer that there are more items in the queue.#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#define BUFFER_SIZE 10
#define PRODUCER_SLEEP_TIME_MS 500
#define CONSUMER_SLEEP_TIME_MS 2000
LONG Buffer[BUFFER_SIZE];
LONG LastItemProduced;
ULONG QueueSize;
ULONG QueueStartOffset;
ULONG TotalItemsProduced;
ULONG TotalItemsConsumed;
CONDITION_VARIABLE BufferNotEmpty;
CONDITION_VARIABLE BufferNotFull;
CRITICAL_SECTION BufferLock;
BOOL StopRequested;
DWORD WINAPI ProducerThreadProc (PVOID p)
{
ULONG ProducerId = (ULONG)(ULONG_PTR)p;
while (true)
{
// Produce a new item.
Sleep (rand() % PRODUCER_SLEEP_TIME_MS);
ULONG Item = InterlockedIncrement (&LastItemProduced);
EnterCriticalSection (&BufferLock);
while (QueueSize == BUFFER_SIZE && StopRequested == FALSE)
{
// Buffer is full - sleep so consumers can get items.
SleepConditionVariableCS (&BufferNotFull, &BufferLock, INFINITE);
}
if (StopRequested == TRUE)
{
LeaveCriticalSection (&BufferLock);
break;
}
// Insert the item at the end of the queue and increment size.
Buffer[(QueueStartOffset + QueueSize) % BUFFER_SIZE] = Item;
QueueSize++;
TotalItemsProduced++;
printf ("Producer %u: item %2d, queue size %2u\r\n", ProducerId, Item, QueueSize);
LeaveCriticalSection (&BufferLock);
// If a consumer is waiting, wake it.
WakeConditionVariable (&BufferNotEmpty);
}
printf ("Producer %u exiting\r\n", ProducerId);
return 0;
}
DWORD WINAPI ConsumerThreadProc (PVOID p)
{
ULONG ConsumerId = (ULONG)(ULONG_PTR)p;
while (true)
{
EnterCriticalSection (&BufferLock);
while (QueueSize == 0 && StopRequested == FALSE)
{
// Buffer is empty - sleep so producers can create items.
SleepConditionVariableCS (&BufferNotEmpty, &BufferLock, INFINITE);
}
if (StopRequested == TRUE && QueueSize == 0)//等消费完再退出
{
LeaveCriticalSection (&BufferLock);
break;
}
// Consume the first available item.
LONG Item = Buffer[QueueStartOffset];
QueueSize--;
QueueStartOffset++;
TotalItemsConsumed++;
if (QueueStartOffset == BUFFER_SIZE)
{
QueueStartOffset = 0;
}
printf ("Consumer %u: item %2d, queue size %2u\r\n",
ConsumerId, Item, QueueSize);
LeaveCriticalSection (&BufferLock);
// If a producer is waiting, wake it.
WakeConditionVariable (&BufferNotFull);
// Simulate processing of the item.
Sleep (rand() % CONSUMER_SLEEP_TIME_MS);
}
printf ("Consumer %u exiting\r\n", ConsumerId);
return 0;
}
int main ( void )
{
InitializeConditionVariable (&BufferNotEmpty);
InitializeConditionVariable (&BufferNotFull);
InitializeCriticalSection (&BufferLock);
DWORD id;
HANDLE hProducer1 = CreateThread (NULL, 0, ProducerThreadProc, (PVOID)1, 0, &id);
HANDLE hConsumer1 = CreateThread (NULL, 0, ConsumerThreadProc, (PVOID)1, 0, &id);
HANDLE hConsumer2 = CreateThread (NULL, 0, ConsumerThreadProc, (PVOID)2, 0, &id);
puts ("Press enter to stop...");
getchar();
EnterCriticalSection (&BufferLock);
StopRequested = TRUE;
LeaveCriticalSection (&BufferLock);
WakeAllConditionVariable (&BufferNotFull);
WakeAllConditionVariable (&BufferNotEmpty);
WaitForSingleObject (hProducer1, INFINITE);
WaitForSingleObject (hConsumer1, INFINITE);
WaitForSingleObject (hConsumer2, INFINITE);
printf ("TotalItemsProduced: %u, TotalItemsConsumed: %u\r\n",
TotalItemsProduced, TotalItemsConsumed);
return 0;
}
关键段比较简单,调用函数如下:
VOID InitializeCriticalSection( LPCRITICAL_SECTION
lpCriticalSection );//初始化一个关键代码段
VOID EnterCriticalSection( LPCRITICAL_SECTION
lpCriticalSection);//获取关键代码段的访问权
VOID LeaveCriticalSection( LPCRITICAL_SECTION
lpCriticalSection);//释放关键代码段的访问权
VOID DeleteCriticalSection( LPCRITICAL_SECTION
lpCriticalSection);//删除关键代码段
条件变量的介绍:
Condition variables are synchronization primitives that enable threads to wait until a particular condition occurs. Condition variables are user-mode objects that cannot be shared across processes.
Condition variables enable threads to atomically release a lock and enter the sleeping state. They can be used with critical sections or slim reader/writer (SRW) locks. Condition variables support operations that "wake one" or "wake all" waiting threads.
After a thread is woken, it re-acquires the lock it released when the thread entered the sleeping state.
Windows Server 2003 and Windows XP: Condition variables are not supported.
The following are the condition variable functions.
Condition variable function | Description |
---|---|
InitializeConditionVariable | Initializes a condition variable. |
SleepConditionVariableCS | Sleeps on the specified condition variable and releases the specified critical section as an atomic operation. |
SleepConditionVariableSRW | Sleeps on the specified condition variable and releases the specified SRW lock as an atomic operation. |
WakeAllConditionVa 4000 riable | Wakes all threads waiting on the specified condition variable. |
WakeConditionVariable | Wakes a single thread waiting on the specified condition variable. |
The following pseudocode demonstrates the typical usage pattern of condition variables.
CRITICAL_SECTION CritSection; CONDITION_VARIABLE ConditionVar; void PerformOperationOnSharedData() { EnterCriticalSection(&CritSection); // Wait until the predicate is TRUE while( TestPredicate() == FALSE ) { SleepConditionVariableCS(&ConditionVar, &CritSection, INFINITE); } // The data can be changed safely because we own the critical // section and the predicate is TRUE ChangeSharedData(); LeaveCriticalSection(&CritSection); // If necessary, signal the condition variable by calling // WakeConditionVariable or WakeAllConditionVariable so other // threads can wake }
The following code implements a producer/consumer queue. The queue is represented as a bounded circular buffer, and is protected by a critical section. The code uses two condition variables: one used by producers (
BufferNotFull) and one used
by consumers (
BufferNotEmpty).
The code calls the
InitializeConditionVariable function to create the condition variables. The consumer threads call theSleepConditionVariableCS
function to wait for items to be added to the queue and theWakeConditionVariable function to signal the producer
that it is ready for more items. The producer threads callSleepConditionVariableCS to wait for the consumer to remove items from the queue andWakeConditionVariable to signal the consumer that there are more items in the queue.#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#define BUFFER_SIZE 10
#define PRODUCER_SLEEP_TIME_MS 500
#define CONSUMER_SLEEP_TIME_MS 2000
LONG Buffer[BUFFER_SIZE];
LONG LastItemProduced;
ULONG QueueSize;
ULONG QueueStartOffset;
ULONG TotalItemsProduced;
ULONG TotalItemsConsumed;
CONDITION_VARIABLE BufferNotEmpty;
CONDITION_VARIABLE BufferNotFull;
CRITICAL_SECTION BufferLock;
BOOL StopRequested;
DWORD WINAPI ProducerThreadProc (PVOID p)
{
ULONG ProducerId = (ULONG)(ULONG_PTR)p;
while (true)
{
// Produce a new item.
Sleep (rand() % PRODUCER_SLEEP_TIME_MS);
ULONG Item = InterlockedIncrement (&LastItemProduced);
EnterCriticalSection (&BufferLock);
while (QueueSize == BUFFER_SIZE && StopRequested == FALSE)
{
// Buffer is full - sleep so consumers can get items.
SleepConditionVariableCS (&BufferNotFull, &BufferLock, INFINITE);
}
if (StopRequested == TRUE)
{
LeaveCriticalSection (&BufferLock);
break;
}
// Insert the item at the end of the queue and increment size.
Buffer[(QueueStartOffset + QueueSize) % BUFFER_SIZE] = Item;
QueueSize++;
TotalItemsProduced++;
printf ("Producer %u: item %2d, queue size %2u\r\n", ProducerId, Item, QueueSize);
LeaveCriticalSection (&BufferLock);
// If a consumer is waiting, wake it.
WakeConditionVariable (&BufferNotEmpty);
}
printf ("Producer %u exiting\r\n", ProducerId);
return 0;
}
DWORD WINAPI ConsumerThreadProc (PVOID p)
{
ULONG ConsumerId = (ULONG)(ULONG_PTR)p;
while (true)
{
EnterCriticalSection (&BufferLock);
while (QueueSize == 0 && StopRequested == FALSE)
{
// Buffer is empty - sleep so producers can create items.
SleepConditionVariableCS (&BufferNotEmpty, &BufferLock, INFINITE);
}
if (StopRequested == TRUE && QueueSize == 0)//等消费完再退出
{
LeaveCriticalSection (&BufferLock);
break;
}
// Consume the first available item.
LONG Item = Buffer[QueueStartOffset];
QueueSize--;
QueueStartOffset++;
TotalItemsConsumed++;
if (QueueStartOffset == BUFFER_SIZE)
{
QueueStartOffset = 0;
}
printf ("Consumer %u: item %2d, queue size %2u\r\n",
ConsumerId, Item, QueueSize);
LeaveCriticalSection (&BufferLock);
// If a producer is waiting, wake it.
WakeConditionVariable (&BufferNotFull);
// Simulate processing of the item.
Sleep (rand() % CONSUMER_SLEEP_TIME_MS);
}
printf ("Consumer %u exiting\r\n", ConsumerId);
return 0;
}
int main ( void )
{
InitializeConditionVariable (&BufferNotEmpty);
InitializeConditionVariable (&BufferNotFull);
InitializeCriticalSection (&BufferLock);
DWORD id;
HANDLE hProducer1 = CreateThread (NULL, 0, ProducerThreadProc, (PVOID)1, 0, &id);
HANDLE hConsumer1 = CreateThread (NULL, 0, ConsumerThreadProc, (PVOID)1, 0, &id);
HANDLE hConsumer2 = CreateThread (NULL, 0, ConsumerThreadProc, (PVOID)2, 0, &id);
puts ("Press enter to stop...");
getchar();
EnterCriticalSection (&BufferLock);
StopRequested = TRUE;
LeaveCriticalSection (&BufferLock);
WakeAllConditionVariable (&BufferNotFull);
WakeAllConditionVariable (&BufferNotEmpty);
WaitForSingleObject (hProducer1, INFINITE);
WaitForSingleObject (hConsumer1, INFINITE);
WaitForSingleObject (hConsumer2, INFINITE);
printf ("TotalItemsProduced: %u, TotalItemsConsumed: %u\r\n",
TotalItemsProduced, TotalItemsConsumed);
return 0;
}
相关文章推荐
- 使用Win32 API实现生产者消费者线程同步
- POSIX 使用互斥量和条件变量实现生产者/消费者问题
- 使用Win32 API实现生产者消费者线程同步
- 并发编程(一): POSIX 使用互斥量和条件变量实现生产者/消费者问题
- 线程同步2 ------ 条件变量实现“生产者-消费者”续
- 【Linux】线程总结:线程同步 -互斥锁,条件变量,信号量实现多生产者多消费者模型
- 并发编程(一): POSIX 使用互斥量和条件变量实现生产者/消费者问题
- 使用信号量和关键区来实现生产者消费者
- 使用 Lock 与Condition 实现生产者消费者
- 使用semaphore实现生产者-消费者简单模型
- 使用关键代码段实现线程同步
- 使用Lock来实现生产者和消费者问题
- 从java多线程实现“生产者-消费者”模型来谈谈操作系统中线程状态的转换及线程同步的总结
- 生产者-消费者问题实现 (windows)
- 在python中实现生产者和消费者的例子(三):使用fork和pipe
- 经典线程同步问题(生产者&消费者)--Java实现
- 使用Lock来实现生产者和消费者问题
- 使用关键代码段实现线程同步
- 经典线程同步问题(生产者&消费者)--Java实现
- 在python中实现生产者和消费者的例子(一):使用multiprocessing和pipe()