您的位置:首页 > 其它

Lock-Free的栈实现及与加锁实现的性能对比

2011-12-21 12:28 483 查看
  在编写多线程程序时,临界资源的处理常常需要互斥量、读写锁等来加以保护。这时需要考虑锁的粒度问题,粒度太粗,会出现很多线程阻塞等待相同的锁,源自并发性的改善微乎其微;如果锁的粒度太细,那么过多的锁开销会使系统性能受到影响,而且代码变得相当复杂。除此之外,还要细致的考虑各种dead lock问题。

  因此,对于某些关键数据结构(临界资源),可以考虑使用Lock Free的实现手段。一个Lock Free的程序能够确保执行它的所有线程至少有一个能够继续往下执行,从而免疫了死锁等问题。Lock Free算法需要对应的原子操作加以支持,比如CAS(compare-and-swap)及其变种。CAS实现的逻辑如下:

bool CAS(inptr_t* addr, intptr_t oldv, intptr_t newv)
atomically{
if(*addr == oldv)
{
*addr = newv;
return true;
}
else
    {
return false
}

}


接下来给出有锁和无锁的栈实现,以及最后的性能测试数据。

有锁实现的栈:

lock_stack.h

#ifndef __LOCK_STACK_H
#define __LOCK_STACK_H
#include<pthread.h>

typedef int value_t;

struct cell
{
struct cell *next;
value_t value;
};

struct stack
{
struct cell *top;
pthread_mutex_t lock;
};

struct stack *stack_alloc(void);
void stack_push(struct stack *fp,struct cell *cl);
struct cell *stack_pop(struct stack *fp);

#endif


  lock_stack.c

#include "lock_stack.h"
#include <pthread.h>
#include <stdlib.h>

struct stack *stack_alloc(void)
{
struct stack *fp;
if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL)
{
fp->top = NULL;
if(pthread_mutex_init(&fp->lock,NULL) != 0)
{
free(fp);
return NULL;
}
}
return fp;
}

void stack_push(struct stack *fp,struct cell *cl)
{
pthread_mutex_lock(&fp->lock);

cl->next = fp->top;
fp->top = cl;

pthread_mutex_unlock(&fp->lock);
}

struct cell *stack_pop(struct stack *fp)
{
cell *head;
pthread_mutex_lock(&fp->lock);

head = fp->top;
if(head == NULL)
{
pthread_mutex_unlock(&fp->lock);
return head;
}

fp->top = head->next;
pthread_mutex_unlock(&fp->lock);
}


  无锁实现的栈:

  lock_free_stack.h

#ifndef __LOCK_FREE_STACK_H
#define __LOCK_FREE_STACK_H
#include<pthread.h>

typedef int value_t;

struct cell
{
struct cell *next;
value_t value;
};

struct stack
{
struct cell *top;
};

struct stack *stack_alloc(void);
void stack_push(struct stack *fp,struct cell *cl);
struct cell *stack_pop(struct stack *fp);

#endif


  lock_free_stack.c

#include "lock_free_stack.h"
#include <pthread.h>
#include <stdlib.h>

struct stack *stack_alloc(void)
{
struct stack *fp;
if((fp = (struct stack*)malloc(sizeof(struct stack))) != NULL)
{
fp->top = NULL;
}
return fp;
}

void stack_push(struct stack *fp,struct cell *cl)
{
do{
cl->next = fp->top;
}while(!__sync_bool_compare_and_swap(&fp->top,cl->next,cl));
}

struct cell *stack_pop(struct stack *fp)
{
cell *head,*next;

do
{
head = fp->top;
if(head == NULL)
{
break;
}
next = head->next;
}while(!__sync_bool_compare_and_swap(&fp->top,head,next));
return head;
}


  主程序使用100个线程,每个线程对同一栈进行500000次随机的push/pop操作(具体进行某个操作rand确定),并记录了每个线程的执行时间,代码如下:

#ifdef __LOCK_VERSION
#include "lock_stack.h"
#endif
#ifdef __LOCK_FREE_VERSION
#include "lock_free_stack.h"
#endif
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <time.h>

void* thr_fn(void *fp)
{
time_t t1 = time(NULL);
for(int i = 0; i != 500000; i++)
{
int r = rand()%2;
struct cell *cl;

  if(r == 0)
  {
  if((cl = (struct cell*)malloc(sizeof(struct cell))) != NULL)
  {
    stack_push((struct stack*)fp,cl);
  }
  printf("push!\n");
  }
  else
  {
   cl = stack_pop((struct stack*)fp);
  if(cl != NULL)
  {
//free(cl);
  }
  printf("pop!\n");
}
}

printf("cost time: %d\n",time(NULL)-t1);

return NULL;
}

int main()
{
struct stack *fp;
fp = stack_alloc();

pthread_t tid;
for(int i = 0; i < 100; i++)
{
pthread_create(&tid,NULL,thr_fn,(void *)fp);
}

sleep(1000);

return 0;
}


  makefile文件如下:

lock_free:
g++ lock_free_stack.c main.c -lpthread -D__LOCK_FREE_VERSION -march=nocona -o lock_free
lock:
g++ lock_stack.c main.c -lpthread -D__LOCK_VERSION -o lock

clean:
rm lock lock_free


  得到最终的测试数据如下所示:



  相比较而言,在性能上lock_free_stack有微弱的优势。

注:

1)大家在看代码时可能已经注意到了,main.c 33行free()函数被注释掉了,这是有意为之,在主动进行内存管理时,Lock Free结构会遇到ABA问题,需要CAS2配合Sequence Number加以解决。感兴趣的读者可以继续阅读文后的参考文献;

2)主线程需要等待所有的子线程执行完毕,这里不好使用pthread_join,本文作者采用取巧的办法,调用sleep函数进行等待,具体等待时间可能和测试环境相关;

3)
__sync_bool_compare_and_swap
是由GCC提供的内置函数(GCC 4.1或更高版本),编译时需要指定-march参数;

参考资料:

[1]搜狗实验室   C10K与高性能程序续篇  http://www.sogou.com/labs/report/4-2.pdf

[2]W.Richard Stevens Advanced Programming in the Unix Environment  11.6节 线程同步

[3]设计不适用互斥锁的并发数据结构 http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/

[4]解决了“undefined reference to `__sync_bool_compare_and_swap_4”的编译错误 /article/3776532.html

[5]compare_and_swap  http://en.wikipedia.org/wiki/Compare-and-swap
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: