C++ 无锁队列 ABA <3>
2016-05-20 13:55
477 查看
本文给出使用数组实现的无锁队列。
注意:本文未考虑伪共享 cache line对多线程无锁队列的影响
本文使用到的技术:
1、内存屏障. lfence:读内存屏障,sfence:写内存屏障,mfence:读写内存屏障, _ReadBarrier、_WriteBarrier
和_ReadWriteBarrier据说已经弃用,应使用atomic_thread_fence?
2、CAS原子操作
队列 及 测试代码如下:有空再写下考虑cache line的无锁数组队列
注意:本文未考虑伪共享 cache line对多线程无锁队列的影响
本文使用到的技术:
1、内存屏障. lfence:读内存屏障,sfence:写内存屏障,mfence:读写内存屏障, _ReadBarrier、_WriteBarrier
和_ReadWriteBarrier据说已经弃用,应使用atomic_thread_fence?
2、CAS原子操作
队列 及 测试代码如下:有空再写下考虑cache line的无锁数组队列
#include <stdio.h> #include <process.h> #include <windows.h> #include <vector> #include <iostream> using namespace std; //#define CAS(a,b,c) (InterlockedCompareExchangePointer((PVOID*)a,(PVOID)c,(PVOID)b) == b) #define CAS(a,b,c) (InterlockedCompareExchange((volatile LONG*)a,(LONG)c,(LONG)b) == b) #define maxsize (1024*32) //为2的幂:方便求余操作转换为&运算,提高效率 #define MEM_BARRIER() __asm {mfence} //内存屏障(多核CPU需考虑) CRITICAL_SECTION g_cs; struct Queue { DWORD head; //头索引 DWORD tail; //尾索引 DWORD maxtail; //实际的最大尾索引 DWORD size; void * buf[maxsize]; }; void InitQueue(Queue *queue) { queue->head = 0; queue->tail = 0; queue->maxtail = 0; queue->size = maxsize; } bool Enqueue(Queue *queue, void *data) { DWORD head,tail,next; while (true) { head = queue->head; tail = queue->tail; next = tail + 1; if (next - head > queue->size) return false; if (CAS(&queue->tail, tail, next)) break; } queue->buf[tail&(queue->size - 1)] = data; MEM_BARRIER(); while(!CAS(&queue->maxtail,tail,next)) _mm_pause()/*Sleep(0)*/; //linux:sched_yield() return true; } void* Dequeue(Queue *queue) { DWORD head, tail, next; void *data; while (true) { head = queue->head; tail = queue->maxtail; next = head + 1; if (head == tail) return nullptr; data = queue->buf[head&(queue->size-1)]; MEM_BARRIER(); if (CAS(&queue->head, head, next)) break; } return data; } /////////////////////////////////////测试代码/////////////////////////////////////////////////// struct Test { int id; //线程索引 int num; //每个线程生产元素个数 Queue *queue; HANDLE event; int count;//每个消费者线程消费的个数 }; unsigned int __stdcall EnqueueThread(void *data) { Test *test = (Test*)data; Queue *queue = test->queue; int num = test->num; int start = num * (test->id + 1); int end = start - num; ::SetEvent(test->event); while (start > end) { int *a = new int; *a = start; if (!Enqueue(queue, a)) { delete a; //cout << "?????" << endl; } start--; } return 0; } unsigned int __stdcall DequeueThread(void *data) { Test *test = (Test*)data; Queue *queue = test->queue; int id = test->id; int &count = test->count; ::SetEvent(test->event); int null_num = 0, num = 0; while (true) { int * ptr = (int*)Dequeue(queue); if (ptr == nullptr) null_num++; else { delete ptr; ptr = nullptr; num++; } if (null_num == 7000) break; } //Sleep(100); EnterCriticalSection(&g_cs); cout << "该消费者线程索引=" << id << ",退出时消费元素个数为 num=" << num << " null_num=" << null_num << endl; LeaveCriticalSection(&g_cs); count = num; return 0; } void test() { Queue *queue = new Queue; InitQueue(queue); HANDLE hThreadEvent = ::CreateEvent(NULL, 0, 0, NULL); Test test; test.queue = queue; test.event = hThreadEvent; test.num = 10000; //单个生产者线程生产的元素数量 test.count = 0; #define num1 2 #define num2 2 HANDLE hBuf1[num1]; HANDLE hBuf2[num2]; cout << "共有" << num1 << "个生产者线程, " << " 共生产出元素个数为:" << num1*test.num << endl; int i = 0; for (; i < num1; i++) { Test t = test; t.id = i; hBuf1[i] = (HANDLE)_beginthreadex(NULL, 0, EnqueueThread, &t, 0, NULL); ::WaitForSingleObject(hThreadEvent, INFINITE); ::ResetEvent(hThreadEvent); } cout << "共有" << num2 << "个消费者线程" << endl; vector<Test*> vecTest; for (int j = i, k = 0; j < i + num2; j++, k++) { Test *t = new Test; *t = test; t->id = j; vecTest.push_back(t); hBuf2[k] = (HANDLE)_beginthreadex(NULL, 0, DequeueThread, vecTest[k], 0, NULL); ::WaitForSingleObject(hThreadEvent, INFINITE); ::ResetEvent(hThreadEvent); } cout << "稍等..." << endl; WaitForMultipleObjects(num1, hBuf1, TRUE, INFINITE); WaitForMultipleObjects(num2, hBuf2, TRUE, INFINITE); //Sleep(test.num / 100); int sum = 0; for (int j = 0; j < vecTest.size(); j++) { sum += vecTest[j]->count; } cout << "--------所有消费者线程共消耗的元素为:" << sum << endl; cout << endl; cout << "请查看生成的元素是否刚好等于消耗的元素!!!如果相等调整null_num值再试试!" << endl; //释放资源 for (auto it = vecTest.begin(); it != vecTest.end(); it++) { if (NULL != *it) { delete *it; *it = NULL; } } delete queue; vecTest.clear(); CloseHandle(hThreadEvent); } int main() { InitializeCriticalSection(&g_cs); int num = 1000; for (int i = 0; i < num; i++) { cout << "--------------共" << num << "次循环---------第【 " << i + 1 << " 】次循环.-------------------" << endl; cout << endl; test(); cout << endl; } ; Sleep(30000); system("pause"); return 0; }