您的位置:首页 > 其它

多线程安全的滑动窗口设计实现

2014-09-24 14:14 225 查看
滑动窗口是日志模块重要的数据结构,用于日志发送接收以及日志索引查询,和组内同学讨论了的多线程安全的滑动窗口设计,有三种实现方案,写此文档记录下。

1.接口描述

滑动窗口内部使用数组,每个数组项的是一个结构体:

Structentry

{

Struct ValueNode *head;

Struct ValueNode *tail;

Int64_t cnt;

Int64_tstat;

}

由于在对同一项多次写入不同值的情况下,写入的多个值会以链表组织,head指向链表头,tail指向链表尾,cnt表明读取当前entry的引用计数,包括链表中所有节点。

StructValueNode的定义如下:

StructValueNode

{

Void *value

Struct ValueNode *next;

}

滑动窗口需要提供以下接口:

1. Init(int64­_t size)

初始化滑动窗口,size 用于指明滑动窗口的大小。

2. set(int64­_t id, const void*val)

set接口用于向滑动窗口中写入数据,id用于指明所写入数据的序号,val指向写入 的数据指针。对同一个id插入不同的值,会发生覆盖。

3. get(int64­_t id, void* &val)

get接口用于从滑动窗口中读出数据,id用于指明所读数据的序号,val指向所读到 的数据的指针。

4. revert (int64­_t id)

读取某一项结束时候,需要调用revert接口。

5. move_foward ()

move_foward用于将滑动窗口向前移动,对于移除滑动窗口的项,需要调用其revert 接口,将entry重置,方便后续复用此接口。

2方案一:读写锁保护start_id

方案一是并发度较低但思路比较简单的实现方案,此方案中,滑动窗口需要维护的成员变量:

1. size:此变量用于指明滑动窗口的大小;

2. start_id:此变量用于指明滑动窗口中最小的id;

3. end_id:此变量用于指明滑动窗口中最大的id;

4. rw_lock:用于保护start_id;

接口实现描述:

1. Init(int64_t size):

将size记录到成员变量中,并申请数组内存(大小为size),将start_id end_id设置为0;

2. set(int64_t id, const void*val)

1) 对rw_lock 加读锁;

2) 判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤4;

3) 使用id对size取模,找到对应的entry,读取此entry的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤4;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);

4) 对rw_lock解锁。

3. get(int64_t id, void* &val)

1) 判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤4;

2) 将当前start_id记录到临时变量tmp_start_id中;

3) 根据id取模,找到对应的entry,如果cnt == 0,跳转步骤7;

4) 将cnt引用计数递增,然后将tail指针所值的vallue 赋值给val;

5) 读取当前start_id和tmp_start_id比较,如果不相等,则跳转步骤1;

6) 返回;

4. revert (int64­_t id)

1) 根据id找到都应的entry,如果cnt ==0,报错;

2) 递减cnt,如果递减后大于0,则退出;

3) 如果递减后的cnt ==0,则遍历head指向的链表,调用每个value的revert函数,并将每个ValueNode内存释放;

5. Move_forwad()

1) 调用get接口,读取start_id位置的状态,根据注册的函数,判断是否可以将其移动出滑动窗口,如果不可以则返回;

2) 将rw_lock加写锁,从start_id开始,扫描滑动窗口,对于每个entry采用以下操作:

a) 根据注册的函数判断是否可以移出滑动窗口,如果不可以,跳转到步骤3;

b) 如果可以,则递减其引用计数,如果递减后不为0,则需要阻塞等待;

c) 将start_id递增1;

3) 对rw_lock解锁,函数返回。

3.方案二:无锁(一)

在方案一中,读写锁在一定程度上影响了并发度,方案二将介绍一种可以不用读写锁的线程安全实现。需要补充的是,方案二需要在Entry中增加stat字段,stat有三个可选值:

1. NULL

表明此entry无人使用,可以写入数据;

2. USE

表明此entry正在被使用,可以读;

3. LOCKED

表明此entry正在处于NULL和USE中间状态,不可读也不可写入;

此外,滑动窗口还需要维护成员变量Last_start_id,其代表在一次move_forward()过程中,上一次的start_id,last_start_id 和start之间的entry,是需要调用reset()清除掉的。

在这些基础上,接口描述如下:

1. Get():

与方案一一致

2. Revert():

与方案一一致。

3. set():

1) 判断是否满足start_id <= id < last_start_id+ size,如果不满足则跳转到步骤5;

2) 使用id对size取模,找到对应的entry,读取此entry的状态,如果是LOCKED,则跳转步骤1;

3) 将entry状态修改为LOCKED,再次判断是否满足start_id <= id <last_start_id+ size;如果不满足则将entry状态修改回原来状态,并跳转步骤5;

4) 读取start的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤4;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);将ENTRY状态修改为USE;

5) 返回

4. move_foward()

1) 先获取到要将滑动窗口起点向后移动的目标id,记录为target_start;

2) 将start_id记录到临时变量tmp_start,然后将start修改为target_start,这需要在一个原子操作中完成

3) 对于tmp_start到target_start中间的每一个entry,执行以下操作:

a) 判断其stat是否为LOCKED,如果是则阻塞等待;

b) 将stat修改为LOCKED,;

c) 递减其引用计数,如果递减后不为0,则需要阻塞等待,如果为0,则释放其内存,并设置其状态为NULL;

4) 比较last_start_id 和tmp_start的大小关系,如果相等,则将last_start 修改为target_start ,否则阻塞等待。

在这个方案中,其实是通过给每个entry添加状态值,对每个entry的修改做并发控制,相对于方案一,减小了锁粒度。

在move_foward()接口实现的第四步中,比较last_start和tmp_start的大小关系,事实上是为了保证,当多个线程同时调用move_foward()接口,同时修改last_start_id时,能够做到串行化,即保证last_start_id顺序递增修改。

4.方案二:无锁(二)

下面介绍第二种无锁实现,在此方案中,无需维护last_log_id,但还需要维护entry状态。

在这些基础上,接口描述如下:

1. Get():

与方案一基本一致,但需要判断所读的ENTRY状态,如果是LOCKED,则需要返回步骤一重新判断。

2. Revert():

与方案一一致。

3. set():

1) 判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤5;

2) 使用id对size取模,找到对应的entry,读取此entry的状态,如果是LOCKED,则跳转步骤1;

3) 将entry状态修改为LOCKED,再次判断是否满足start_id <= id<start_id + size:如果不满足则将entry状态修改回原来状态,并跳转步骤5;

4) 读取start的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤5;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);将ENTRY状态修改为USE;

5) 返回。

4. move_foward()

1) 先获取到要将滑动窗口起点向后移动的目标id,记录为target_start;

2) 将start_id记录到临时变量,tmp_start中,对于start到target_start中间的每一个entry,执行以下操作:

a) 判断其stat是否为LOCKED,如果是则阻塞等待;

b) 将stat修改为LOCKED,重新读取start,判断start_id是否等于tmp_start,如果不等,则跳转到步骤2开始;

c) 递减其引用计数,如果递减后不为0,则需要阻塞等待;如果为0,则释放其内存,并设置其状态为NULL;

d) 将start_id递增1;

4.特殊需求:

在我们的设计中,新当选的leader需要写一条sync barrier日志,之后才能处理滑动窗口中的未决日志。如果此时滑动窗口中普通的未决日志已经写满,则无法再写入sync barrier日志,导致恢复流程失败。

因此,滑动窗口需要提供一种特殊接口:set_common_entry()和set_special_entry(),同时初始化时候需要传入common_size和special_size,通常,special_size>common_size。两个不同的接口使用不同的size,保证sync_barrier日志可以写入滑动窗口。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: