您的位置:首页 > 编程语言 > C语言/C++

C++ 无锁队列 ABA <3>

2016-05-20 13:55 477 查看
本文给出使用数组实现的无锁队列。

注意:本文未考虑伪共享   cache line对多线程无锁队列的影响

本文使用到的技术:

1、内存屏障. lfence:读内存屏障,sfence:写内存屏障,mfence:读写内存屏障, _ReadBarrier、_WriteBarrier
和_ReadWriteBarrier据说已经弃用,应使用atomic_thread_fence?

2、CAS原子操作

队列 及 测试代码如下:有空再写下考虑cache line的无锁数组队列

#include <stdio.h>
#include <process.h>
#include <windows.h>
#include <vector>
#include <iostream>
using namespace std;

//#define CAS(a,b,c) (InterlockedCompareExchangePointer((PVOID*)a,(PVOID)c,(PVOID)b) == b)
#define CAS(a,b,c) (InterlockedCompareExchange((volatile LONG*)a,(LONG)c,(LONG)b) == b)
#define maxsize (1024*32)  //为2的幂:方便求余操作转换为&运算,提高效率
#define MEM_BARRIER() 	__asm {mfence} //内存屏障(多核CPU需考虑)

CRITICAL_SECTION g_cs;

struct Queue
{
DWORD head;       //头索引
DWORD tail;       //尾索引
DWORD maxtail;    //实际的最大尾索引
DWORD size;
void * buf[maxsize];
};

void InitQueue(Queue *queue)
{
queue->head = 0;
queue->tail = 0;
queue->maxtail = 0;
queue->size = maxsize;
}

bool Enqueue(Queue *queue, void *data)
{
DWORD head,tail,next;
while (true)
{
head = queue->head;
tail = queue->tail;
next = tail + 1;

if (next - head > queue->size)
return false;
if (CAS(&queue->tail, tail, next))
break;
}
queue->buf[tail&(queue->size - 1)] = data;
MEM_BARRIER();
while(!CAS(&queue->maxtail,tail,next))
_mm_pause()/*Sleep(0)*/; //linux:sched_yield()
return true;
}

void* Dequeue(Queue *queue)
{
DWORD head, tail, next;
void *data;

while (true)
{
head = queue->head;
tail = queue->maxtail;
next = head + 1;

if (head == tail)
return nullptr;

data = queue->buf[head&(queue->size-1)];
MEM_BARRIER();
if (CAS(&queue->head, head, next))
break;
}
return data;
}
/////////////////////////////////////测试代码///////////////////////////////////////////////////
struct Test
{
int id;   //线程索引
int num;  //每个线程生产元素个数
Queue *queue;
HANDLE event;
int count;//每个消费者线程消费的个数
};
unsigned int __stdcall EnqueueThread(void *data)
{
Test *test = (Test*)data;
Queue *queue = test->queue;
int num = test->num;
int start = num * (test->id + 1);
int end = start - num;
::SetEvent(test->event);

while (start > end)
{
int *a = new int;
*a = start;
if (!Enqueue(queue, a))
{
delete a;
//cout << "?????" << endl;
}
start--;
}
return 0;
}

unsigned int __stdcall DequeueThread(void *data)
{
Test *test = (Test*)data;
Queue *queue = test->queue;
int id = test->id;
int &count = test->count;
::SetEvent(test->event);
int null_num = 0, num = 0;
while (true)
{
int * ptr = (int*)Dequeue(queue);
if (ptr == nullptr)
null_num++;
else
{
delete ptr; ptr = nullptr;
num++;
}
if (null_num == 7000) break;
}
//Sleep(100);
EnterCriticalSection(&g_cs);
cout << "该消费者线程索引=" << id << ",退出时消费元素个数为 num=" << num << " null_num=" << null_num << endl;
LeaveCriticalSection(&g_cs);
count = num;
return 0;
}

void test()
{
Queue *queue = new Queue;
InitQueue(queue);
HANDLE hThreadEvent = ::CreateEvent(NULL, 0, 0, NULL);

Test test;
test.queue = queue;
test.event = hThreadEvent;
test.num = 10000;    //单个生产者线程生产的元素数量
test.count = 0;

#define  num1 2
#define  num2 2
HANDLE hBuf1[num1];
HANDLE hBuf2[num2];

cout << "共有" << num1 << "个生产者线程, " << " 共生产出元素个数为:" << num1*test.num << endl;
int i = 0;
for (; i < num1; i++)
{
Test t = test;
t.id = i;
hBuf1[i] = (HANDLE)_beginthreadex(NULL, 0, EnqueueThread, &t, 0, NULL);
::WaitForSingleObject(hThreadEvent, INFINITE);
::ResetEvent(hThreadEvent);
}

cout << "共有" << num2 << "个消费者线程" << endl;
vector<Test*> vecTest;
for (int j = i, k = 0; j < i + num2; j++, k++)
{
Test *t = new Test;
*t = test;
t->id = j;
vecTest.push_back(t);
hBuf2[k] = (HANDLE)_beginthreadex(NULL, 0, DequeueThread, vecTest[k], 0, NULL);
::WaitForSingleObject(hThreadEvent, INFINITE);
::ResetEvent(hThreadEvent);
}

cout << "稍等..." << endl;
WaitForMultipleObjects(num1, hBuf1, TRUE, INFINITE);
WaitForMultipleObjects(num2, hBuf2, TRUE, INFINITE);
//Sleep(test.num / 100);
int sum = 0;
for (int j = 0; j < vecTest.size(); j++)
{
sum += vecTest[j]->count;
}
cout << "--------所有消费者线程共消耗的元素为:" << sum << endl;
cout << endl;
cout << "请查看生成的元素是否刚好等于消耗的元素!!!如果相等调整null_num值再试试!" << endl;

//释放资源
for (auto it = vecTest.begin(); it != vecTest.end(); it++)
{
if (NULL != *it)
{
delete *it;
*it = NULL;
}
}
delete queue;
vecTest.clear();
CloseHandle(hThreadEvent);
}

int main()
{
InitializeCriticalSection(&g_cs);
int num = 1000;
for (int i = 0; i < num; i++)
{
cout << "--------------共" << num << "次循环---------第【 " << i + 1 << " 】次循环.-------------------" << endl;
cout << endl;
test();
cout << endl;
}
;
Sleep(30000);
system("pause");
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: