您的位置:首页 > 编程语言

数据插入一种基于CAS的无锁并发HashTable设计及C代码实现

2013-05-12 16:48 701 查看
发一下牢骚和主题无关:

在多线程环境下,我们常经常使用Java的ConcurrentHashMap,但其实这个Map仍然是要应用锁的,只不过应用了一种被称为StripeLock的方式。这里我们试着实现一个完全无锁的HashTable。

首先我们必须弄清楚HashMap有哪些操纵,哪些地方是临界区(CriticalRegion),哪里不是。一个完整的HashMap不过就以下几种操纵:

1. 插入数据(PUT);

2. 取数据(GET);

3. 扩容(RESIZE);

4. 删除数据(DELETE)。

这里我们应用链表数据结构的Map。这类数据结构的特点是,解决冲突是将冲突数据持到上一个以后。



插入数据(PUT):

单线程的插入我想大家理解都不是问题,插入的进程分成三步:

1)新建一个结点node;

2)将head指针的next值赋值给node的next域;

3)将head指针指向新建的node。

在并发环境下,问题三于上述上个步调并非原子的。例如线程1进行到第二步时,如果另一个线程2一次性完成了以上三个步调,这个时候线程1再进行3时就会形成线程插入的数据丧失。这类丧失在C语言中会直接导致内存泄漏(MemoryLeak)。



所以问题的关键的就于在完成第三步的时候必须确保head指针仍处于第一步操纵时的状态,这样才能正确完成操纵,这里我们就要用到CAS操纵,Linux下glibc自带这个方法。CAS(CompareAnd Swap)方法签名如下:

T __sync_val_compare_and_swap(T* __p, T __compVal, T __exchVal, ...);

其中__p指的是指向数据类型T的指针,__compVal是要比较的值,__exchVal是如果__p与__compVal相称,则须要向__p指向的数据域赋值的数据。如果__p与__compVal不相称,则什么也不做。刚才说过了,INSERT在并发操纵时的问题在于在操纵head时,head指向的值已经不是INSERT开始时的值,所以我们可以利用__sync_val_compare_and_swap来进行原子操纵,如果不成功,则从新开始插入操纵。这样,当head发生变化时,就不会对head进行操纵,取而代之的是重头开始执行插入操纵,保证了团体的一致性。以C代码为例,实现的代码如下:

void insert(Node* head, Key key, Value value)
{

Node* new_entry = malloc(sizeof(Entry));

new_entry->key =key;

new_entry->value= value;

// use CAS to enablelock free

while(1)
{
prev_head = head;

new_entry->next = (struct Entry*)prev_head;

if (CAS(&head,prev_head, new_entry) == prev_head)

{

break;

}

}
}


为了方便取数据,我们须要对插入的数据进行排序(原因前面会说到),按key降序排列。这样在查找时当key>= cur->key时我们就能够停止向后搜寻了。

Node* cur = head;
Node* prev = cur;

while(cur != NULL && key < cur->key)
{
prev =cur;
cur =cur->next;
}
insert(&prev, key, value);


但如果这里不进行特殊处理,就会出现一个问题:在并发环境下,节点随时有可能被插入,但while中的语句并非原子的,这就可能得到错误的插入点。例如有 key序列 10 9 5 2,在插入3时,我们遍历到节点5,发现5以后为2,满足while条件,于是我们以为应当在这里进行插入。但如果这个时候另一个线程在5以后插入了4,则显然正确的插入点应当酿成4,而上述代码仍按5进行插入,则会导致乱序,变为10 9 5 3 4 2。同上,为了解决这个问题,每当指针发生变化时(见下),我们都必须从新查找:

void insert(Node* head, Key key, Value value)
{
Entry* new_entry = malloc(sizeof(Entry));
new_entry->key =key;
new_entry->value= value;

// use CAS to enablelock free
while(1)
{
Node** prev = &head;
Node* cur = head;
while(cur != NULL && key < cur->key)
{
prev = &cur->next;
cur = cur->next;
}
new_entry->next= *prev;
if (CAS(prev, cur, new_entry) == cur)
{
break;
}

cur = *prev;
}
}


这样,当选定插入点后,如果有其它线程抢占插入,则if (CAS(prev, cur, new_entry) == cur)条件便不会成立,确定以下三种情况之一发生了:

1、prev以后有新的结点插入;

2、cur指向的结点被删除了;

3、next被删除了。

这个时候就须要从新进行选定插入点的工作。

取数据(GET):

每日一道理

生命不是一篇"文摘",不接受平淡,只收藏精彩。她是一个完整的进程,是一个"连载",无论成功还是失败,她都不会在你背后留有空白;生命也不是一次彩排,走得不好还可以从头再来,她绝不给你第二次机会,走过去就没法回头。

因为我们应用的是链接方式来解决冲突的,如果Hash对应的结点已经挂了多个数据,那么我们在取数据的时候就须要遍历所挂的节点来找到我们真正须要的数据,这里就可能出现因为数据后期插入而我们的游标已经移到其以后而返回没有对应数据这类不一致的情况。例如有key列表: 10 9 5 2,我们要搜索结点3,当我们遍历到5时,如果另一线程B在头处插入了3,由于遍历的时光复杂度为O(n),当数据量大时,就能昭示察觉到这类延迟了,显然达不到设计要求。

就会返回不存在该值的结果。
为了解决这个问题,我们可以对链表进行排序来解决这个问题,当条件 if(key < cur->key) 满足时,这个时候要我们要查找的数据只会在cur以后,所以我们可以放地把cur向后移,直到上述条件不成立,这个时候我们再判断就能够了。为什么这样可以解决问题,因为排序后如果插入数据的key大于当前的,表现它会被插到cur之前,如果小于,至少也是在cur以后,最差的情况就是刚好在prev与cur之间,这时我们只须要从新扫描一次就能够了(其实也可以从prev处开始,但我们应用的是单链表,没法得知prev的prev了)。为了保险,我们可以再加一次判断。代码如下:

Value find(Node* head, Key key)
{
Node** prev;
Node* cur;
Node** next;

Value cvalue;
Key ckey;

while(1)
{
prev = &head;
cur = head;

while(1)
{
if(cur == NULL)
{
return NULL;
}

next = &cur->next;
ckey = cur->key;
cvalue = cvalue->value;

if(*prev != cur)
{
break; // The list has been modified, start over
}

if(key >= ckey)
{
return key == ckey ? cvalue : NULL;
}

// else keep looking
prev = next;
cur = *next;
}
}
}

为发增加find的利用价值,我们可以在find函数里带出prev,cur,next指针,以供其它函数应用,签名如下:

Value find(Node* head, Key key, Node*** prev, Node** cur, Node*** next)


详细代码实现详见DELETE小节。

虽然这样做后仍然有可以出现另一线程在(prev_cur, cur]之间插入数据的可能,但这已经大大减少了其可能性,误差时光在实时操纵范围以内。

代码中的条件语句

if(*prev != cur)
{

break; // The list has been modified, start over

}


成立时表示当前结点已经被其它线程修改了,插入了新的数据,必须从新进行查找。

对于这类情况,首先我们可以对key进行排序来简化此问题,当对key进行排序后,扫描后的结点key都是比搜索key大的结点,因此即便有其它线程插入了新结点,也不会是所求的结点,我们可以疏忽此种插入。我们独一须要关怀的就是当 key >= cur->key时,因为这时候须要搜索的结点极可能就在 (prev_cur, cur] 区间,如果这时候另一线程向此区间插入了对应结点,则我们就可能错过。
,假设我们搜索结点3,在遍历到2时,我们发现 2 < 3,即前面弗成能有即是3的值了(key已经按降序排列),这个时候,如果另一个线程向5后插入了3,则我们仍然返回的是没有找到数据。这就会形成不一致的情况。为了解决这个问题,我们必须在prev_cur变革时进行从新搜索,解决方案也很简略,类似于下面的插入操纵。代码如下:

删除数据(DELETE):

现在我们来谈谈数据的删除操纵。要想删除数据首先我们得找到对应的数据结点,然后更新链表即可。假设当前结点前,中,后对应的指针分别为prev, cur, next,操纵进程如下:

1)prev = cur->next;

2)free(cur);

在并发环境下有可能发生这样的情况

1)另一线程在prev与cur间插入了新的结点,这个时候如果再执行prev = cur->next就会导致新插入的结点丧失

2)另一线程在cur与next间插入了数据,同上,这类情况也会导致新插入的结点丧得到。

上述两种情况都会导致内存泄漏,是不是接受的。我们可以应用CAS操纵来完成删除操纵。代码如下:

int remove_node(Key key)
{
Node** prev;
Node* cur;
Node** next;
while(1)
{
if(find(key, &prev, &cur, &next) == NULL)
{
return 0;
}

if(CAS(prev, cur, *next) == cur)
{
free(cur);
}
}
}

但是这里有个问题,就是在并发环境下,新插入的结点如果恰好在被删结点以后,下面的算法就可能导致数据丧失。为了解决这个问题,我们须要在删除之前对其进行标记,此标记是个boolean型。当然我们可以简略地添加一个short型什么的,但由于C在编译时会进行内存对齐,所以其可能占用更多的空间。例如:

Entry
{
void* key;
void* value;
short tag;
}

则这个Node在32位机上实际占用的空间为12bytes,关于更多的内存对齐信息,可以访问 http://en.wikipedia.org/wiki/Data_structure_alignment

这里有个小技巧,因为指针都是按照机器字进行对齐的,所以最低位始终为0(不然就是奇数了),我们可以利用其存储标记位,为了方便,我们将其中一些操纵定义成宏:

typedef uintptr_t marked_ptr_t;
#define MARK_OF(x) ((x) & 1) // 对最低位进行标记
#define PTR_MASK(x) ((x) & ~(marked_ptr_t)1)
#define PTR_OF ((Node*)PTR_MASK(x)) // 取出除第1位的所有数据,组成Node的指针
#define CONSTRUCT(mark, ptr) (PTR_MASK((uintptr_t)ptr) | (mark)) // 将Node指针的最低位置成mark(0或1)


注意,由于指针并不属于数值型数据,我们不能直接其进行位操纵,所以我们须要把其转换成uintptr_t。

那么新的代码就应当是这样的了:

int remove_node(marked_ptr_t* head, const char* key)
{
marked_ptr_t cur;
marked_ptr_t* prev;

while(1)
{
if(find(head, key, &prev, &cur) == NULL)
{
return 0;
}

if (CAS(prev, CONSTRUCT(0, cur), CONSTRUCT(1, cur)) != CONSTRUCT(0, cur)) {continue;}
if (CAS(prev, CONSTRUCT(1, cur), PTR_OF(cur)->next) == CONSTRUCT(1, cur))
{
free(PTR_OF(cur)); // may be we should consider IBM freelist
}
else
{
find(head, key, NULL, NULL); // use find to remove the marked node
}

return 1;
}
}

那么由于标记的引入,我们还须要对find方法进行一些修改,也有就是如果该结点已经被标记了,我们不能应用之,如下:

void* find(marked_ptr_t* head, const char* key, marked_ptr_t** prev, marked_ptr_t* cur)
{
marked_ptr_t* tp_prev;
marked_ptr_t tp_cur;
marked_ptr_t* tp_next;

if(PTR_OF(*head) == NULL)
{
if(prev) {*prev = head;};
if(cur){*cur = *head;};

return NULL;
}

while(1)
{
tp_prev = head;
tp_cur = *head;

while(PTR_OF(tp_cur) != NULL)
{
tp_next = &PTR_OF(tp_cur)->next;

if(*tp_prev != tp_cur)
{
break; // someone has mucked with the list, start over
}

if(MARK_OF(tp_cur))
{
if (CAS(tp_prev, CONSTRUCT(1, tp_cur), tp_next) == CONSTRUCT(1, tp_cur)) {
free(PTR_OF(tp_cur)); // 标记1
tp_cur = *tp_next;
continue;
} else {
break; //start over
}
}

if (key >= PTR_OF(tp_cur)->key)
{
if(prev){*prev = tp_prev;};
if(cur){*cur = tp_cur;};

return strcmp(key, PTR_OF(tp_cur)->key) == 0 ? PTR_OF(tp_cur)->value : NULL;
}

tp_prev = (marked_ptr_t*)&PTR_OF(tp_cur)->next;
tp_cur = (marked_ptr_t)*tp_next;
}

return NULL;
}
}

这里请大家注意一下“标记1“处的代码,我们应用的是直接free的形式,这类形式的问题在于,我们在free的时候,可能另一个线程已经将其指针返回了,即有其它线程在引用。这个时候如果我们将其释放了必然会导致另一个线程段错误。这里我们可以简略地应用“引用计数”来解决这个问题,为此这个引用计数可以放在Node里或者返回的数据里,也就是说free的时候只free Node本身,为了简略我们选择后一种,即放在数据里,这样Value如下:

typedef struct
{
Data data;
volatile uint32_t ref_count;
} Value;

这样我们就须要增加两个函数incr_ref 与 desc_ref,从概念上来说,desc_ref 可以这么写:

void desc_ref(Value* value)
{
if(__sync_sub_and_fetch(&value->ref_count, 1) == 0)
{
free(value->data);
free(value);

return;
}
}

incr_ref 要稍稍复杂一些,因它必须保证 ref_count > 0 再在其基础上 +1 整个是个原子操纵,否则就可能拿到脏数据。

Value* incr_ref(Value* value)
{
uint32_t old_count;

while(1)
{
old_count = value->ref_count;

if(old_count <= 0)
{
return NULL; // already deleted
}

if(CAS(&value->ref_count, old_count, old_count + 1) == old_count)
{
break;
}
}

if (value->tag != "0x32da2f") // 标记1
{
return NULL;
}

return value;
}

大家注意“标记1”处的代码,为什么须要有这个tag呢? 这是个非常有趣的并发问题,在超高并发环境下,有可能出现incr_ref时,Value的内存已经被释放并分配给其它线程且存入了对应的数据,大家都知道C里同进程没有内存保护的概念,这个时候Value->ref_count仍然是可以取到数据的,只不过可能是其它的数据什么的,这时极可能就是一个大于0的值,incr_ref自然会正常返回,这里再去操纵value就会有意想不到的结果,非常危险!所以我们须要指定个随机标识来确定这块内存还是不是我们以为的Value。当然要解决这个问题还其它很多方法了,这里只是起个抛砖引玉的作用了。

扩容(RESIZE):

在并发环境下,对hashtable进行扩容主要有两个问题:

1)由于每个key hash过后对应独一的一个entry下标,在扩容后,对应的下标就会发生变化。例如如果我们采用对key取模来计算hash值,假设初始hashtable的大小为4,则5对应的地址为 5 % 4 = 1,当hashtable扩容到8以后,5对应的下标却成了5。这样,再次应用5作用key去取数据时便不能获取到数据了;

2)如果不能够继续分配连续的地址空间,则须要将原来的数据拷贝到新申请的地址空间,虽然我们应用的hashtable的链表实现方式,可以免去对详细元素的拷贝,但这样仍然须要复制指针,这里就很难保证一致性了。

对于这些问题,首先想到的就是一致性hash,但一致性hash也存在部分节点的迁移问题,如果采取先迁移再启用的机制,那么又会存在一个增量迁移(迁移速度能否跟上数据插入速度)的问题。为了解决这个问题我们可以预分配地址空间,事先把最大的bucket数量确定下来,一般一个整页为宜(在32位机器上一个指针的大小是4B,NTFS页文件大小为4KB,这样我们至少可以存储下1K个bucket,假设一个bucket链10个数据,一共就有10K存储空间,一般应用已足够。而且这样连初始化都省略了,非常方便~代码如下:

typedef struct
{
marked_ptr_t buckets[MAX_NODE_SIZE];
uint32_t size;
} LF_HashTable;


C语言实现(IMPLEMENT):

好了,说了这么说,现在我们把下面的思想汇聚起来,最终的代码可以在这里找到:

https://github.com/sefler1987/lf_table

还有什么问题(PROBLEMS):

1)Node的分配释放问题,本文里对Node的创建释放都采取的是最简略的malloc与free方法, 这类方法效率低,不利于资源的回收,可以采用freelist或者pool的形式;

2)由于buckets数据是固定的,如果最开始的buckets没有确定好,在单bucket的load_factor较高时会退化成线性查找。

参考资料(REFERENCES):

1)High Performance Dynamic Lock-Free Hash Tables and List-Based Sets http://www.research.ibm.com/people/m/michael/spaa-2002.pdf

文章结束给大家分享下程序员的一些笑话语录:

Google事件并不像国内主流媒体普遍误导的那样,它仅仅是中国Z府和美国公司、中国文化和美国文化甚至中国人和美国人之间的关系,是民族主义和帝国主义之间的关系;更重要的是,它就是Z府和公司之间的关系,是权力管制和市场自由之间的关系。从这个意义上说,过度管制下的受害者,主要是国内的企业。Google可以抽身而去,国内的企业只能祈望特区。www.ishuo.cn
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐