Lock-Free的栈实现及与加锁实现的性能对比
2011-12-21 12:28
483 查看
在编写多线程程序时,临界资源的处理常常需要互斥量、读写锁等来加以保护。这时需要考虑锁的粒度问题,粒度太粗,会出现很多线程阻塞等待相同的锁,源自并发性的改善微乎其微;如果锁的粒度太细,那么过多的锁开销会使系统性能受到影响,而且代码变得相当复杂。除此之外,还要细致的考虑各种dead lock问题。
因此,对于某些关键数据结构(临界资源),可以考虑使用Lock Free的实现手段。一个Lock Free的程序能够确保执行它的所有线程至少有一个能够继续往下执行,从而免疫了死锁等问题。Lock Free算法需要对应的原子操作加以支持,比如CAS(compare-and-swap)及其变种。CAS实现的逻辑如下:
接下来给出有锁和无锁的栈实现,以及最后的性能测试数据。
有锁实现的栈:
lock_stack.h
lock_stack.c
无锁实现的栈:
lock_free_stack.h
lock_free_stack.c
主程序使用100个线程,每个线程对同一栈进行500000次随机的push/pop操作(具体进行某个操作rand确定),并记录了每个线程的执行时间,代码如下:
makefile文件如下:
得到最终的测试数据如下所示:
相比较而言,在性能上lock_free_stack有微弱的优势。
注:
1)大家在看代码时可能已经注意到了,main.c 33行free()函数被注释掉了,这是有意为之,在主动进行内存管理时,Lock Free结构会遇到ABA问题,需要CAS2配合Sequence Number加以解决。感兴趣的读者可以继续阅读文后的参考文献;
2)主线程需要等待所有的子线程执行完毕,这里不好使用pthread_join,本文作者采用取巧的办法,调用sleep函数进行等待,具体等待时间可能和测试环境相关;
3)
参考资料:
[1]搜狗实验室 C10K与高性能程序续篇 http://www.sogou.com/labs/report/4-2.pdf
[2]W.Richard Stevens Advanced Programming in the Unix Environment 11.6节 线程同步
[3]设计不适用互斥锁的并发数据结构 http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/
[4]解决了“undefined reference to `__sync_bool_compare_and_swap_4”的编译错误 /article/3776532.html
[5]compare_and_swap http://en.wikipedia.org/wiki/Compare-and-swap
因此,对于某些关键数据结构(临界资源),可以考虑使用Lock Free的实现手段。一个Lock Free的程序能够确保执行它的所有线程至少有一个能够继续往下执行,从而免疫了死锁等问题。Lock Free算法需要对应的原子操作加以支持,比如CAS(compare-and-swap)及其变种。CAS实现的逻辑如下:
bool CAS(inptr_t* addr, intptr_t oldv, intptr_t newv) atomically{ if(*addr == oldv) { *addr = newv; return true; } else { return false } }
接下来给出有锁和无锁的栈实现,以及最后的性能测试数据。
有锁实现的栈:
lock_stack.h
#ifndef __LOCK_STACK_H #define __LOCK_STACK_H #include<pthread.h> typedef int value_t; struct cell { struct cell *next; value_t value; }; struct stack { struct cell *top; pthread_mutex_t lock; }; struct stack *stack_alloc(void); void stack_push(struct stack *fp,struct cell *cl); struct cell *stack_pop(struct stack *fp); #endif
lock_stack.c
#include "lock_stack.h" #include <pthread.h> #include <stdlib.h> struct stack *stack_alloc(void) { struct stack *fp; if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL) { fp->top = NULL; if(pthread_mutex_init(&fp->lock,NULL) != 0) { free(fp); return NULL; } } return fp; } void stack_push(struct stack *fp,struct cell *cl) { pthread_mutex_lock(&fp->lock); cl->next = fp->top; fp->top = cl; pthread_mutex_unlock(&fp->lock); } struct cell *stack_pop(struct stack *fp) { cell *head; pthread_mutex_lock(&fp->lock); head = fp->top; if(head == NULL) { pthread_mutex_unlock(&fp->lock); return head; } fp->top = head->next; pthread_mutex_unlock(&fp->lock); }
无锁实现的栈:
lock_free_stack.h
#ifndef __LOCK_FREE_STACK_H #define __LOCK_FREE_STACK_H #include<pthread.h> typedef int value_t; struct cell { struct cell *next; value_t value; }; struct stack { struct cell *top; }; struct stack *stack_alloc(void); void stack_push(struct stack *fp,struct cell *cl); struct cell *stack_pop(struct stack *fp); #endif
lock_free_stack.c
#include "lock_free_stack.h" #include <pthread.h> #include <stdlib.h> struct stack *stack_alloc(void) { struct stack *fp; if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL) { fp->top = NULL; } return fp; } void stack_push(struct stack *fp,struct cell *cl) { do{ cl->next = fp->top; }while(!__sync_bool_compare_and_swap(&fp->top,cl->next,cl)); } struct cell *stack_pop(struct stack *fp) { cell *head,*next; do { head = fp->top; if(head == NULL) { break; } next = head->next; }while(!__sync_bool_compare_and_swap(&fp->top,head,next)); return head; }
主程序使用100个线程,每个线程对同一栈进行500000次随机的push/pop操作(具体进行某个操作rand确定),并记录了每个线程的执行时间,代码如下:
#ifdef __LOCK_VERSION #include "lock_stack.h" #endif #ifdef __LOCK_FREE_VERSION #include "lock_free_stack.h" #endif #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <time.h> void* thr_fn(void *fp) { time_t t1 = time(NULL); for(int i = 0; i != 500000; i++) { int r = rand()%2; struct cell *cl; if(r == 0) { if((cl = (struct cell*)malloc(sizeof(struct cell))) != NULL) { stack_push((struct stack*)fp,cl); } printf("push!\n"); } else { cl = stack_pop((struct stack*)fp); if(cl != NULL) { //free(cl); } printf("pop!\n"); } } printf("cost time: %d\n",time(NULL)-t1); return NULL; } int main() { struct stack *fp; fp = stack_alloc(); pthread_t tid; for(int i = 0; i < 100; i++) { pthread_create(&tid,NULL,thr_fn,(void *)fp); } sleep(1000); return 0; }
makefile文件如下:
lock_free: g++ lock_free_stack.c main.c -lpthread -D__LOCK_FREE_VERSION -march=nocona -o lock_free lock: g++ lock_stack.c main.c -lpthread -D__LOCK_VERSION -o lock clean: rm lock lock_free
得到最终的测试数据如下所示:
相比较而言,在性能上lock_free_stack有微弱的优势。
注:
1)大家在看代码时可能已经注意到了,main.c 33行free()函数被注释掉了,这是有意为之,在主动进行内存管理时,Lock Free结构会遇到ABA问题,需要CAS2配合Sequence Number加以解决。感兴趣的读者可以继续阅读文后的参考文献;
2)主线程需要等待所有的子线程执行完毕,这里不好使用pthread_join,本文作者采用取巧的办法,调用sleep函数进行等待,具体等待时间可能和测试环境相关;
3)
__sync_bool_compare_and_swap是由GCC提供的内置函数(GCC 4.1或更高版本),编译时需要指定-march参数;
参考资料:
[1]搜狗实验室 C10K与高性能程序续篇 http://www.sogou.com/labs/report/4-2.pdf
[2]W.Richard Stevens Advanced Programming in the Unix Environment 11.6节 线程同步
[3]设计不适用互斥锁的并发数据结构 http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/
[4]解决了“undefined reference to `__sync_bool_compare_and_swap_4”的编译错误 /article/3776532.html
[5]compare_and_swap http://en.wikipedia.org/wiki/Compare-and-swap
相关文章推荐
- evpp性能测试(3): 对无锁队列boost::lockfree::queue和moodycamel::ConcurrentQueue做一个性能对比测试
- 采用Shell编程实现随机函数(包括整数、符点数、字符、日期时间),顺便与AWK实现作下性能对比
- ReentrantLock和内部锁的性能对比(update)
- MemoryPool的LockFree实现
- java集合框架之Set集合实现类性能对比
- synchronize实现同步锁的对比二:当前字节码文件代码块的lock与静态代码块同步锁对比
- 某接口3种实现方案性能压测结果对比及分析
- Apworks框架中各种仓储实现的性能基准测试与结果对比
- MemoryPool的LockFree实现
- 选择,插入,希尔,快速,堆,归并排序六种排序方式的Java 实现和性能对比(付代码)
- PHP排序算法实现 与sort性能对比
- 高斯模糊效果实现方案及性能对比
- php实现斐波那契数列的三种方法,递归回调和迭代器和数组之间性能对比
- 高斯模糊效果实现方案及性能对比
- 【数据结构】常见的排序方法的实现以及性能对比
- 通过稀疏扩展信息滤波实现SLAM,并与EKF-SLAM进行性能对比
- 带回收功能的lockfree queue的java实现
- ReentrantLock和内部锁的性能对比
- 使用httpclient实现http链接池与使用HttpURLConnection发送http请求的方法与性能对比
- 多线程无锁(lock-free)队列(queue)的实现探讨