您的位置:首页 > 编程语言 > C语言/C++

Windows Via C/C++:用户模式下的线程同步——Condition Variables 条件变量

2009-11-08 20:25 531 查看
注意,上一节讨论的Slim Reader-Writer Locks和本节讨论的条件变量(Condition Variable)是微软从Vista开始才提供的新同步对象

条件变量

你已经知道在同步生产者和消费者线程时,可以使用SRWLock以共享读或排它写的方式保护被访问的资源。试想一下,假如消费者线程获得锁之后发现缓冲区中没有可供消费的资源,它就应该释放锁并一直等待生产者线程向缓冲区内写入新的资源,同样,假如生产者线程获得锁之后发现缓冲区已满,它应该释放锁并等待缓冲区中有资源被消费者线程消费掉。条件变量(Condition Varables)可以简化处理上述同步场景时所需的工作,通过调用SleepConditionVariableCS或SleepConditionVariableSRW,当前线程可以自动离开临界区/释放锁并阻塞到条件变量满足后重新进入临界区/获得锁,一步完成LeaveCriticalSeciton/ReleaseSRWLockXXX、WaitXXX、EnterCriticalSection/AcquireSRWLockXXX所需的工作,并且保证这一过程是原子执行的。

BOOL SleepConditionVariableCS(

PCONDITION_VARIABLE pConditionVariable,

PCRITICAL_SECTION pCriticalSection,

DWORD dwMilliseconds);

BOOL SleepConditionVarableSRW(

PCONDITION_VARIABLE pConditionVariable,

PSRWLOCK pSRWLock,

DWORD dwMilliseconds,

ULONG Flags);

参数pConditionVariable是要等待的条件变量,第二个参数指向要释放并在条件变量满足后重新进入的临界区/SRWLock,dwMilliseconds设置函数等待的超时时间,注意SleepConditionVarableSRW还有一个Flags参数,当需要以排它方式访问SRWLock时应该传递0,否则将其设置为CONDITION_VARIABLE_LOCKMODE_SHARED。如果在dwMilliseoncds时间内函数指定的条件变量仍不满足,函数会返回FALSE,此时函数并没有重新获取已释放的临界区/SRWLock。

在SleepConditionVariable*调用内阻塞的线程会被别的线程调用WakeConditionVariable/WakeAllConditionVariable唤醒。Wake*函数表示相应的条件变量已经满足,这两个函数的签名没有太大区别:

VOID WakeConditionVariable(

PCONDITION_VARIABLE pConditionVariable);

VOID WakeAllConditionVariable(

PCONDITION_VARIABLE pConditionVariable);

WakeConditionVariable调用只会唤醒所有等待pConditionVariable线程中的一个,该线程将重新获得SleepConditonVariable*内释放的临界区/SRWLock,并且当该线程将其释放时,其它等待的线程并不会被唤醒。而WakeAllConditionVariable则有所不同会唤醒所有调用SleepConditionVariableSRW(...,CONDITION_VARIABLE_LOCKMODE_SHARED)阻塞的线程,或是以排它方式等待条件变量的某一线程(SRWLock写线程和使用临界区的线程),当被唤醒的线程释放临界区/锁之后,其它阻塞的线程会被自动唤醒并重新竞争调度,直到所有阻塞的线程都成功的获取锁为止。

使用条件变量的例子

本节并没有翻译原文中的Queue Sample Application,而是用MSDN中的比较直观的例子

下面的代码使用临界区和条件变量模拟一个消费者/生产者过程,缓冲区使用循环数组,临界区用来保护该缓冲区,而两个条件变量(BufferNotFull和BufferNotEmpty)则分别用于生产者线程和消费者线程,以下是全部代码:

#include

#include

#include

#define BUFFER_SIZE 10  // 缓冲区数组大小

#define PRODUCER_SLEEP_TIME_MS 500  // 生产者进程睡眠时间

#define CONSUMER_SLEEP_TIME_MS 2000  // 消费者进程睡眠时间

LONG Buffer[BUFFER_SIZE];  // 缓冲区数组

LONG LastItemProduced;  // 最后一个产品的编号

ULONG QueueSize;  // 当前缓冲区中元素的数目

ULONG QueueStartOffSet;  // 当前缓冲区循环队列的起始下标

ULONG TotalItemsProduced;  // 生产者线程产生的所有元素的数目

ULONG TotalItemsConsumed;  // 消费者线程消费的所有元素的数目

CONDITION_VARIABLE BufferNotEmpty;

CONDITION_VARIABLE BufferNotFull;

CRITICAL_SECTION BufferLock;

BOOL StopRequested;  // 结束标志

DWORD WINAPI ProducerThreadProc(PVOID p);

DWORD WINAPI ConsumerThreadProc(PVOID p);

void _cdecl wmain(int argc, const wchar_t* argv[])

{

InitializeConditionVariable(&BufferNotEmpty);

InitializeConditionVariable(&BufferNotFull);

InitializeCriticalSection(&BufferLock);

DWORD id;

HANDLE hProducer1 = CreateThread(NULL, 0, ProducerThreadProc, (PVOID)1, &id);

HANDLE hConsumer1 = CreateThread(NULL, 0, ConsumerThreadProc, (PVOID)1, &id);

HANDLE hConsumer2 = CreateThread(NULL, 0, ConsumerThreadProc, (PVOID)2, &id);

puts("Press enter to stop...");

getchar();

EnterCriticalSection(&BufferLock);

StopRequested = TRUE;

LeaveCriticalSection(&BufferLock);

WakeAllConditionVariable(&BufferNotFull);

WakeAllConditionVariable(&BufferNotEmpty);

WaitForSingleObject(hProducer1, INFINITE);

WaitForSingleObject(hConsumer1, INFINITE);

WaitForSingleObject(hConsumer2, INFINITE);

printf("Total items produced:%u, total items consumed:%u",

TotalItemsProduced, TotalItemsConsumed);

}

// 生产者线程入口点函数

DWORD WINAPI ProducerThreadProc(PVOID p)

{

ULONG ProcuderId = (ULONG)(ULONG_PTR)p;

while(true){

// Simulate producing a new item

Sleep(rand()%PROCUDER_SLEEP_TIME_MS);

ULONG item = InterlockedIncrement(&LastItemProduced);

EnterCriticalSection(&BufferLock);

while(QueueSize == BUFFER_SIZE && StopRequested == FALSE)

{

SleepConditionVariableCS(&BufferNotFull, &BufferLock, INFINITE);

}

if(StopRequested == TRUE)

{

LeaveCiriticalSection(&BufferLock);

break;

}

// Insert the item at the end of the queue and increase size

Buffer[(QueueStartOffset + QueueSize) % BUFFER_SIZE] = Item;

QueueSize ++;

TotalItemsProduced ++;

printf("Producer %u: item %2d, queue size %2u/r/n", ProducerId, item, QueueSize);

LeaveCriticalSection(&BufferLock);

WakeConditionVariable(&BufferNotEmpty);

}

printf("Producer %u exiting/r/n", ProducerId);

return 0;

}

// 消费者线程入口点函数

DWORD WINAPI ConsumerThreadProc(PVOID p)

{

ULONG ConsumerId = (ULONG)(ULONG_PTR)p;

while(true){

EnterCriticalSection(&BufferLock);

while(QueueSize == 0 && StopRequested == FALSE){

SleepConditionVariableCS(&BufferNotEmpty, &BufferLock, INFINITE);

}

if(StopRequested == TRUE && QueueSize == 0){

LeaveCriticalSection(&BufferLock);

break;

}

LONG item = Buffer[QueueStartOffset];

QueueSize --;

QueueStartOffset ++;

TotalItemConsumed ++;

if(QueueStartOffset == BUFFER_SIZE)

QueueStartOffset = 0;

printf("Consumer %u: item %2d, queue size %2u/r/n", consumerId, item, QueueSize);

LeaveCriticalSection(&BufferLock);

WakeConditionVariable(&BufferNotFull);

Sleep(rand()%CONSUMER_SLEEP_TIME_MS);

}

printf("Consumer %u exiting/r/n", consumerId);

return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐