您的位置:首页 > 大数据 > 人工智能

libev 学习笔记之timer实现原理

2016-07-02 19:18 609 查看

一. timer简介

定时器

简单而言,一个定时器(timer),就是在定义的超时时间过后主动去执行指定的超时处理函数的组件

定时器典型使用场景

定时器(timer)是网络通信必不可少的一个组件,在复杂的网络环境下,端到端通信时有失败。一个典型的使用场景是,A主机发送写文件指令到B主机,B主机收到写请求后开始写,但由于环境原因B主机上出现IO-hang并且不可恢复,此时B主机的写线程会一直阻塞,当然也不会主动回复写结果给A主机;此时A主机能做的就只有傻傻等待,进而造成读写阻塞,白白的浪费自己的系统资源。而如果A发送请求到B时在A上启动一个timer,此问题便可以缓解。此时,A和B之间的一次同步通信就有两种终止方式,一种是B回复写结果给A,A终止同步通信,另一种是A上设置的定时器超时即在期望时间内B未回复写结果给A,A认为要么网络异常要么B已挂,此时A就可以直接关闭同步连接,进而继续去处理其他任务。

二. libev中的timer

libev原生支持定时器功能,底层通过堆来管理多个定时器,效率较高。linux下,通过epoll来精准的控制定时时间,支持毫秒级定时器

libev中定义了定时器相关的结构:
struct ev_timer
,通过此结构来实现定时时间和超时回调函数的绑定

/* shared by all watchers */

#define EV_WATCHER(type)            \

int active; /* private */         \
int pending; /* private */            \
EV_DECL_PRIORITY /* private */        \
EV_COMMON /* rw */                \
EV_CB_DECLARE (type) /* private */

#define EV_WATCHER_TIME(type)\

EV_WATCHER (type)\
ev_tstamp at;     /* private */

/* invoked after a specific time, repeatable (based on monotonic clock) */
/* revent EV_TIMEOUT */
typedef struct ev_timer
{
EV_WATCHER_TIME (ev_timer);
ev_tstamp repeat; /* rw */
} ev_timer;


libev中对timer的底层数据结构支持

通过堆来管理诸多定时器,又同时支持二叉树实现的最小堆和四叉树实现的最小堆

libev中定义了专门操作定时器的相关函数

ev_timer_set

绑定超时时间和重复增量时间到ev_timer

ev_timer_init

初始化timer,绑定超时回调函数到ev_timer中,调用
ev_timer_set
,绑定定时时间,指定timer在第一次超时后的增量超时设定时间

ev_timer_start

置ev_timer状态为激活状态,将ev_timer结构添加到ev_loop中的timers数组中,并且对在timers中建立的小顶堆结构进行调整维护

ev_timer_stop

置ev_timer状态为非激活状态,从timers数组中删除ev_timer对象,并且对在timers中建立的小顶堆结构进行调整维护

ev_timer_again

如果timer是激活状态,则判断ev_timer结构中的
repeat
项是否为0,如果是,则直接调用
ev_timer_stop
停止timer,否则对timer的下次超时时间进行调整并对timers堆进行调整;如果timer为未激活状态,则判断ev_timer结构中的
repeat
是否为0,若不为0,则设置timer时间为
repeat
指定时间,并且调用
ev_timer_start
启动timer

ev_timer_remaining

返回距离下次timeout超时所剩余的时间

定时实现

通过指定epoll_wait的最后一个参数来实现定时触发回调

三. libev中timer触发流程

timer使用流程



如上图所示,基本的使用流程极其简单。其中timer的回调函数
timer_cb
的函数原型为:
void (*cb)(struct ev_loop *loop, struct ev_timer *w, int revents)


timer触发流程

主要触发流程在主循环
ev_run
函数中,在主循环开始之前需要先将timer装填到
struct ev_loop
结构中,装填函数为
ev_timer_start


ev_timer_start

如上所述,主要将新的timer插入在
ev_loop
结构的
timers
成员中维护的最小堆。

ev_run

是所有时间循环的入口函数,在函数体内的大循环中调用
epoll_wait
来等待事件触发,调用
EV_INVOKE_PENDING
ev_invoke_pending
函数来分发处理所有就绪事件。对于timer事件而言,简单的时间循环流程就是从
ev_loop
timers
最小堆中取出堆顶元素作为
epoll_wait
的超时返回时间,当
epoll_wait
返回后,即表明定时时间已到,即可调用超时回调函数处理超时事件。

timer_loop



其中
timers_reify
函数内对就绪的timer进行预处理,更新就绪timerout的所有timer,如果timer的
repeat
不为0,则在当前时间基础上增加
repeat
的值作为时间差并调整最小堆;否则,调用
ev_timer_stop
来停止timer。紧接着将预处理过的就绪的timer放入
ev_loop
结构的
pendings
数组中。之后的
EV_INVOKE_PENDING
ev_invoke_pending
函数会遍历
pendings
数组,并
EV_CB_INVOKE
即调用
timer_cb
回调函数完成超时事件处理。

四. ev_loop对timer的管理

在上述timer触发流程中,可以看到,在整个以epoll为主的事件循环中,libev主要依赖
truct ev_loop
结构来组织管理更新所有的timer。其实除了timer如此,libev中所有的事件循环中的事件都是填充到
ev_loop
中来统一管理的。

1. timer底层数据结构: 最小堆

数组实现的堆结构

struct ev_loop
成员
ANHE * timers


是所有timer的集合,是一个
ANHE *
的数组,在数组内维护了一个最小堆,用以快速的找到所有定时器中超时时间离当前系统时间最接近的timer,采纳此超时时间作为
epoll_wait
的超时返回时间。

其中,
ANHE
结构定义如下:

/* base class, nothing to see here unless you subclass */
typedef struct ev_
4000
watcher_time
{
EV_WATCHER_TIME (ev_watcher_time)
} ev_watcher_time;

typedef ev_watcher_time *WT;

/* a heap element */
typedef struct {
ev_tstamp at;
WT w;
} ANHE;

#define ANHE_w(he)        (he).w     /* access watcher, read-write */

#define ANHE_at(he)       (he).at    /* access cached at, read-only */

#define ANHE_at_cache(he) (he).at = (he).w->at /* update at from watcher */


其中,
ev_watcher_time
被看作为基类看待,而
struct ev_timer
定义为:

/* invoked after a specific time, repeatable (based on monotonic clock) */
/* revent EV_TIMEOUT */
typedef struct ev_timer
{
EV_WATCHER_TIME (ev_timer)

ev_tstamp repeat; /* rw */
} ev_timer;


可见,
ev_timer
包含有
ev_watcher_timer
所仅有的若干成员,因此,可以将
ev_watcher_timer
作为
ev_timer
的父类来看待。

堆的创建、调整

struct ev_loop
timers
成员数组实现的最小堆,具体最小堆相关操作,此处不予赘述。

2. 带优先级的就绪事件
pendings
数组

2.1
pendings
数组

/* stores the pending event set for a given watcher */
typedef struct
{
W w;
int events; /* the pending event set for the given watcher */
} ANPENDING;

/* these priorities are inclusive, higher priorities will be invoked earlier */
#ifndef EV_MINPRI
# define EV_MINPRI (EV_FEATURE_CONFIG ? -2 : 0)
#endif
#ifndef EV_MAXPRI
# define EV_MAXPRI (EV_FEATURE_CONFIG ? +2 : 0)
#endif

// 4
#define NUMPRI (EV_MAXPRI - EV_MINPRI + 1)

VAR (pendings, ANPENDING *pendings [NUMPRI])
VAR (pendingmax, int pendingmax [NUMPRI])
VAR (pendingcnt, int pendingcnt [NUMPRI])
VARx(int, pendingpri) /* highest priority currently pending */
VARx(ev_prepare, pending_w) /* dummy pending watcher */


可见,
pendings
数组是一个四维数组。其中每一维数组用来存储相同优先级的就绪事件集合。

2.2
pendings
数组的“生产”和“消费”

生产

上面提到的
timers_reify
函数内,会将所有就绪的timer插入
ev_loop
pendings
数组。

/* make timers pending */
inline_size void
timers_reify (EV_P)
{
EV_FREQUENT_CHECK;

if (timercnt && ANHE_at (timers [HEAP0]) < mn_now)
{
do
{
ev_timer *w = (ev_timer *)ANHE_w (timers [HEAP0]);

/*assert (("libev: inactive timer on timer heap detected", ev_is_active (w)));*/

/* first reschedule or stop timer */
if (w->repeat)
{
ev_at (w) += w->repeat;
if (ev_at (w) < mn_now)
ev_at (w) = mn_now;

assert (("libev: negative ev_timer repeat value found while processing timers", w->repeat > 0.));

ANHE_at_cache (timers [HEAP0]);
downheap (timers, timercnt, HEAP0);
}
else
ev_timer_stop (EV_A_ w); /* nonrepeating: stop timer */

EV_FREQUENT_CHECK;
feed_reverse (EV_A_ (W)w);
}
while (timercnt && ANHE_at (timers [HEAP0]) < mn_now);

feed_reverse_done (EV_A_ EV_TIMER);
}
}


其中,
feed_reverse
函数,先将从堆顶开始遍历就绪的timer依次放入
ev_loop
rfeeds
数组中。

inline_speed void
feed_reverse (EV_P_ W w)
{
array_needsize (W, rfeeds, rfeedmax, rfeedcnt + 1, EMPTY2);
rfeeds [rfeedcnt++] = w;
}


之后统一调用
feed_reverse_done
函数,将
rfeeds
数组中的就绪事件通过
ev_feed_event
函数插入到
pendings
中。

inline_size void
feed_reverse_done (EV_P_ int revents)
{
do
ev_feed_event (EV_A_ rfeeds [--rfeedcnt], revents);
while (rfeedcnt);
}


需要注意的是,
rfeeds
数组中存储的就绪timer,是从最小堆的堆顶开始依次取得的,即就绪事件的timeout时间与当前时间的时间差越来越大
(((ev_timer *)rfeeds[0])->at
<
((ev_timer *)rfeeds[1])->at
<
((ev_timer *)rfeeds[2])->at
< …),如其函数名此处调用
ev_feed_event
传递的参数,是从
rfeeds
数组中逆序得到的,为什么要这样做呢?下文将予以解答。

# define ABSPRI(w) (((W)w)->priority - EV_MINPRI)

void noinline
ev_feed_event (EV_P_ void *w, int revents) EV_THROW
{
W w_ = (W)w;
int pri = ABSPRI (w_);

if (expect_false (w_->pending))
pendings [pri][w_->pending - 1].events |= revents;
else
{
w_->pending = ++pendingcnt [pri];
array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);
pendings [pri][w_->pending - 1].w      = w_;
pendings [pri][w_->pending - 1].events = revents;
}

pendingpri = NUMPRI - 1;
}


可以看出,
ev_feed_event
函数的做法是,先得到就绪事件watcher的绝对优先级(默认为2,即 0 - (-2)),以此为索引,找到就绪事件优先级所对应的
pendings
中的一维数组,然后求得在该一维数组中的下标,赋予watcher的
pending
,然后将就绪时间watcher和就绪事件类型放入
pendings
数组。

在得到索引时:
w_->pending = ++pendingcnt [pri]
用到的
pendingcnt
数组,是一个含有四个元素的一维数组,与
pendings
数组相对应。
pendingcnt
数组的 ‘0~3’ 四个元素代表了四个优先级,其中每个元素都是一个对应于该优先级的计数器,亦即其中每个元素的值代表了该优先级下现有的就绪事件个数,顺序增长。

消费

在整个事件循环中,
EV_INVOKE_PENDING
用来完成最后的就绪事件分发。

# define EV_INVOKE_PENDING ev_invoke_pending (EV_A)

void noinline
ev_invoke_pending (EV_P)
{
pendingpri = NUMPRI;

while (pendingpri) /* pendingpri possibly gets modified in the inner loop */
{
--pendingpri;

while (pendingcnt [pendingpri])
{
ANPENDING *p = pendings [pendingpri] + --pendingcnt [pendingpri];

p->w->pending = 0;
EV_CB_INVOKE (p->w, p->events);
EV_FREQUENT_CHECK;
}
}
}


从函数实现可看出,就绪事件分发顺序由优先级来定,从最高优先级到最低优先级来层层分发处理。

如之前所述,
pendingcnt [pendingpri]
,表示了
pendingpri
优先级下的就绪事件个数。

取事件时,
ANPENDING *p = pendings [pendingpri] + --pendingcnt [pendingpri];
是从
pendings[pendingpri]
一维数组中逆序取得。由之前
feed_reverse_done
函数实现知道,在
pendings[pendingpri]
数组中

((ev_timer *)pendings[pendingpri][0])->at
>
((ev_timer *)pendings[pendingpri][1])->at
>
((ev_timer *)pendings[pendingpri][2])->at
>…,
at
值越小,说明定时器的定时时间与当前时间时间差越大,即超时的越早,当然就应该优先去处理。因此,此处是逆序处理的,也可以解释为什么
feed_reverse_done
是逆序添加就绪timer了。

五. 总结

以上即为除了时间堆的维护细节外libev中对timer的实现细节,主要依赖于主循环结构体
struct ev_loop
来完成对timer的管理,依赖
epoll_wait
来实现定时返回。实现简单,但libev的设计思路却极其巧妙。上述诸多结构,除了支持timer外,其他的事件也同样支持。libev用C语言的结构体与宏定义结合实现了C++中的类继承关系,宏定义的强大初露锋芒。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息