[c++]无锁队列
2015-10-01 11:17
225 查看
/* * lock free queue, thread safe * this paper "A practical nonblocking queue algorithm using compare-and-swap" provide the main idea * queue.hpp * enqueue into the tail * dequeue from head */ #ifndef __QUEUE_HPP__ #define __QUEUE_HPP__ typedef int int_t; typedef long long int_2t; #define sync_get_2t(val_ptr) __sync_fetch_and_add((val_ptr),0) #define sync_inc_t(val_ptr) __sync_fetch_and_add((val_ptr),1) #define sync_dec_t(val_ptr) __sync_fetch_and_sub((val_ptr),1) #define sync_cas_2t(val_ptr,old_val,new_val) __sync_bool_compare_and_swap((val_ptr),(old_val),(new_val)) #define int_2t_low_part_ptr(x) ((int_t*)(&(x))) #define int_2t_high_part_ptr(x) (((int_t*)(&(x)))+1) #define int_2t_low_part(x) (*(int_2t_low_part_ptr(x))) #define int_2t_high_part(x) (*(int_2t_high_part_ptr(x))) template<typename T> class queue { private: T *m_data_queue; int_2t *m_free_array; int_2t *m_data_array; int_2t m_free_head; int_2t m_free_tail; int_2t m_data_head; int_2t m_data_tail; const int_t m_capacity; int_t m_size; public: queue(const int_t size=1024):m_capacity(size+1) { if(m_capacity>1){ m_data_queue = new T[m_capacity-1]; m_free_array = new int_2t[m_capacity]; m_data_array = new int_2t[m_capacity]; int_2t_low_part(m_free_tail)=m_capacity-1; int_2t_high_part(m_free_tail)=0; m_free_head=0; m_data_head=0; m_data_tail=0; m_size=0; for(int i=0;i<m_capacity;++i){ int_2t_low_part(m_free_array[i])=i; int_2t_high_part(m_free_array[i])=0; int_2t_low_part(m_data_array[i])=-1; int_2t_high_part(m_data_array[i])=0; } int_2t_low_part(m_free_array[m_capacity-1])=-1; } } ~queue() { if(m_capacity>1){ delete [] m_data_queue; delete [] m_free_array; delete [] m_data_array; } } //enqueue one free node back to the tail bool enqueue_free(const T *p) { int_t index = p - m_data_queue; while(true){ int_2t tail = sync_get_2t(&m_free_tail); int_2t head = sync_get_2t(&m_free_head); int_t tail_ptr = int_2t_low_part(tail); int_t head_ptr = int_2t_low_part(head); int_t tail_next_ptr = tail_ptr+1; if(tail_next_ptr==m_capacity) tail_next_ptr=0; if(tail!=sync_get_2t(&m_free_tail)) continue; if(tail_next_ptr==head_ptr) return false; int_2t tail_ptr_old_val = sync_get_2t(&(m_free_array[tail_ptr])); int_t tail_ptr_old_val_ptr = int_2t_low_part(tail_ptr_old_val); if(tail_ptr_old_val_ptr==-1){ int_2t tail_ptr_new_val = tail_ptr_old_val; int_2t_low_part(tail_ptr_new_val) = index; int_2t_high_part(tail_ptr_new_val) = int_2t_high_part(tail_ptr_old_val)+1; if(sync_cas_2t(&(m_free_array[tail_ptr]),tail_ptr_old_val,tail_ptr_new_val)){ int_2t tail_new = tail; int_2t_low_part(tail_new)=tail_next_ptr; int_2t_high_part(tail_new)=int_2t_high_part(tail)+1; sync_cas_2t(&m_free_tail,tail,tail_new); return true; } } else{ int_2t tail_new = tail; int_2t_low_part(tail_new)=tail_next_ptr; int_2t_high_part(tail_new)=int_2t_high_part(tail)+1; sync_cas_2t(&m_free_tail,tail,tail_new); } } } //dequeue one free node from the head bool dequeue_free(T *(&p)) { while(true){ int_2t head = sync_get_2t(&m_free_head); int_2t tail = sync_get_2t(&m_free_tail); int_t head_ptr = int_2t_low_part(head); int_t tail_ptr = int_2t_low_part(tail); if(head!=sync_get_2t(&m_free_head)) continue; if(head_ptr==tail_ptr) return false; int_2t head_ptr_old_val = sync_get_2t(&(m_free_array[head_ptr])); int_t head_ptr_old_val_ptr = int_2t_low_part(head_ptr_old_val); if(head_ptr_old_val_ptr!=-1){ int_2t head_ptr_new_val = head_ptr_old_val; int_2t_low_part(head_ptr_new_val)=-1; int_2t_high_part(head_ptr_new_val)=int_2t_high_part(head_ptr_old_val)+1; if(sync_cas_2t(&(m_free_array[head_ptr]),head_ptr_old_val,head_ptr_new_val)){ int_2t head_new = head; int_t head_new_ptr = int_2t_low_part(head_new)+1; if(head_new_ptr==m_capacity) head_new_ptr=0; int_2t_low_part(head_new)=head_new_ptr; int_2t_high_part(head_new)=int_2t_high_part(head)+1; sync_cas_2t(&m_free_head,head,head_new); p=head_ptr_old_val_ptr + m_data_queue; return true; } } else{ int_2t head_new = head; int_t head_new_ptr = int_2t_low_part(head_new)+1; if(head_new_ptr==m_capacity) head_new_ptr=0; int_2t_low_part(head_new)=head_new_ptr; int_2t_high_part(head_new)=int_2t_high_part(head)+1; sync_cas_2t(&m_free_head,head,head_new); } } } //enqueue one data node back to the tail bool enqueue_data(const T *p) { int_t index = p - m_data_queue; while(true){ int_2t tail = sync_get_2t(&m_data_tail); int_2t head = sync_get_2t(&m_data_head); int_t tail_ptr = int_2t_low_part(tail); int_t head_ptr = int_2t_low_part(head); int_t tail_next_ptr = tail_ptr+1; if(tail_next_ptr==m_capacity) tail_next_ptr=0; if(tail!=sync_get_2t(&m_data_tail)) continue; if(tail_next_ptr==head_ptr) return false; int_2t tail_ptr_old_val = sync_get_2t(&(m_data_array[tail_ptr])); int_t tail_ptr_old_val_ptr = int_2t_low_part(tail_ptr_old_val); if(tail_ptr_old_val_ptr==-1){ int_2t tail_ptr_new_val = tail_ptr_old_val; int_2t_low_part(tail_ptr_new_val) = index; int_2t_high_part(tail_ptr_new_val) = int_2t_high_part(tail_ptr_old_val)+1; if(sync_cas_2t(&(m_data_array[tail_ptr]),tail_ptr_old_val,tail_ptr_new_val)){ int_2t tail_new = tail; int_2t_low_part(tail_new)=tail_next_ptr; int_2t_high_part(tail_new)=int_2t_high_part(tail)+1; sync_cas_2t(&m_data_tail,tail,tail_new); sync_inc_t(&m_size); return true; } } else{ int_2t tail_new = tail; int_2t_low_part(tail_new)=tail_next_ptr; int_2t_high_part(tail_new)=int_2t_high_part(tail)+1; sync_cas_2t(&m_data_tail,tail,tail_new); } } } //dequeue one data node from the head bool dequeue_data(T *(&p)) { while(true){ int_2t head = sync_get_2t(&m_data_head); int_2t tail = sync_get_2t(&m_data_tail); int_t head_ptr = int_2t_low_part(head); int_t tail_ptr = int_2t_low_part(tail); if(head!=sync_get_2t(&m_data_head)) continue; if(head_ptr==tail_ptr) return false; int_2t head_ptr_old_val = sync_get_2t(&(m_data_array[head_ptr])); int_t head_ptr_old_val_ptr = int_2t_low_part(head_ptr_old_val); if(head_ptr_old_val_ptr!=-1){ int_2t head_ptr_new_val = head_ptr_old_val; int_2t_low_part(head_ptr_new_val)=-1; int_2t_high_part(head_ptr_new_val)=int_2t_high_part(head_ptr_old_val)+1; if(sync_cas_2t(&(m_data_array[head_ptr]),head_ptr_old_val,head_ptr_new_val)){ int_2t head_new = head; int_t head_new_ptr = int_2t_low_part(head_new)+1; if(head_new_ptr==m_capacity) head_new_ptr=0; int_2t_low_part(head_new)=head_new_ptr; int_2t_high_part(head_new)=int_2t_high_part(head)+1; sync_cas_2t(&m_data_head,head,head_new); p=head_ptr_old_val_ptr + m_data_queue; sync_dec_t(&m_size); return true; } } else{ int_2t head_new = head; int_t head_new_ptr = int_2t_low_part(head_new)+1; if(head_new_ptr==m_capacity) head_new_ptr=0; int_2t_low_part(head_new)=head_new_ptr; int_2t_high_part(head_new)=int_2t_high_part(head)+1; sync_cas_2t(&m_data_head,head,head_new); } } } const int_t size(){ return m_size; } }; #endif
------------------------------------------
测试程序
-----------------------------------------
#include<iostream> #include<unistd.h> #include<pthread.h> #include"queue.hpp" using namespace std; #define QUEUE_SIZE 100 #define THREAD_NUM 100 #define LOOP_NUM 1000000 queue<long long> q(QUEUE_SIZE); long long total=1; void* producer(void*p) { long long *d; for(int i=0;i<LOOP_NUM;++i){ while(q.dequeue_free(d)==false); *d=__sync_fetch_and_add(&total,1); while(q.enqueue_data(d)==false); } } void* consumer(void*p) { long long*d; long long*cnt=(long long*)p; for(int i=0;i<LOOP_NUM;++i){ while(q.dequeue_data(d)==false); *cnt += *d; while(q.enqueue_free(d)==false); } } int main() { pthread_t threads[THREAD_NUM*2]; long long cnt[THREAD_NUM]; long long sum=0; long long expect=THREAD_NUM*LOOP_NUM; for(int i=0;i<THREAD_NUM;++i) cnt[i]=0; for(int i=0;i<THREAD_NUM;++i){ if(pthread_create(threads+i,NULL,producer,NULL)) cout << "Error at create thread " << i << endl; } for(int i=0;i<THREAD_NUM;++i){ if(pthread_create(threads+i+THREAD_NUM,NULL,consumer,cnt+i)) cout << "Error at create thread " << i+THREAD_NUM << endl; } for(int i=0;i<THREAD_NUM*2;++i){ if(pthread_join(threads[i],NULL)) cout << "Error at join thread " << i << endl; } for(int i=0;i<THREAD_NUM;++i){ sum+=cnt[i]; cout << i << "=" << cnt[i] << endl; } cout << "Actual Value : " << sum << endl; expect=(expect*(expect+1))/2; cout << "Expect Value : " << expect << endl; return 0; }
相关文章推荐
- C/C++ Volatile关键词深度剖析(转)
- 《C++primer(第五版)》学习之路-第十三章:拷贝控制
- 字符串匹配——C++使用Regex
- C++ Primer Plus 课后习题 第二章
- C++中引用
- typename的用法
- C++基础编程---1.3模糊概念
- C++ ——三大特性理解
- C++ - 继承与派生的概念
- 涉及虚函数的继承
- 转:DLL如何导出C++的类
- 用C语言程序实现黎曼和求定积分
- C++ 设计模式
- 生成8位26个字母和数字的全排列(密码字典,密钥)c++代码(非递归高效直接)
- leetcode笔记:Simplify Path
- ubuntu下如何编译C语言
- C++那些细节--const与函数
- C语言求两个数中最大公约数
- 一个简单地C语言程序展示RSA加密原理
- C++——Static变量初始化