您的位置:首页 > 其它

libevent札记 - libevent是怎么开始工作的

2017-08-22 13:48 393 查看
原文链接

1.Reactor模式

libevent采用的是Reactor模式,一种事件列表机制,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。

了解过异步模式的应该知道,异步模式一般也都有回调函数,但在这里,Reactor不是异步的,Reactor更像是一种同步模式框架,尽管可以同时接收多个服务请求,并且依次同步的处理它们的事件驱动程序,实际上数据拷贝还是阻塞的

由于Reactor模式不是异步的,所以可以预见,如果Reactor处理耗时长的操作会造成事件分发的阻塞,影响到后续事件的处理;(Proactor是异步的)

2.主循环



2.1 主循环函数event_base_loop(current_base, flags);

event_dispatch-->event_base_loop

int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done;

/* clear time cache */
base->tv_cache.tv_sec = 0;

if (base->sig.ev_signal_added)
evsig_base = base;  //处理signal时,指名所属的base实例
done = 0;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif

while (!done) {
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
base->event_gotterm = 0;
break;
}
if (base->event_break) {
base->event_break = 0;
break;
}

/* You cannot use this interface for multi-threaded apps */
while (event_gotsig) {
event_gotsig = 0;
if (event_sigcb) {
res = (*event_sigcb)(); //信号处理回调
if (res == -1) {
errno = EINTR; //错误代码,信号中断
return (-1);
}
}
}
//校正系统时间
timeout_correct(base, &tv);

// 根据 timer heap中事件的最小超时时间,计算系统I/O demultiplexer 的最大等待时间
tv_p = &tv;
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/* 依然有未处理的就绪时间,就让I/O demultiplexer立即返回
* if we have active events, we just poll new events
* without waiting.
*/
evutil_timerclear(&tv);
}
/* 如果当前没有注册事件,就退出 If we have no events, we just exit */
if
d916
(!event_haveevents(base) && !base->event_count_active) {
event_debug(("%s: no events registered.", __func__));
return (1);
}

/* 更新last wait time,update last old time */
gettime(base, &base->event_tv);

/* clear time cache */
base->tv_cache.tv_sec = 0;

// 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等;
// 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中
res = evsel->dispatch(base, tv_p);

if (res == -1)
return (-1);
gettime(base, &base->tv_cache);// 将time cache赋值为当前系统时间

timeout_process(base);/*检查heap中的timer events,将就绪的timer event从heap上删除
并插入到激活链表中*/

// 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表,处理就绪事件
if (base->event_count_active) {
event_process_active(base); //处理激活链表中的就绪event
if (!base->event_count_active && (flags & EVLOOP_ONCE))
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}

/* clear time cache */
base->tv_cache.tv_sec = 0;

event_debug(("%s: asked to terminate loop.", __func__));
return (0);
}


注释:

event_sigcb与event_gotsig

为了避免信号竞争,事件API提供了两程变量:event_sigcb 和 event_gotsig.

某个信号的句柄设置event_gotsig表示收到信号

应用程序把event_sigcb设置成一个回调函数.当信号句柄设置了

event_gotsig之后,event_dispatch函数会执行回调函数处理接收到的信号.

当没有事件注册时回调函数返回1.

回调函数可以返回-1表示错误,这将导致event_dispatch()结束,错误代码为EINTR.

1.2 标志符的说明

在event.h文件中,进行相应的标志符定义

#define EVLOOP_ONCE 0x01    /**< Block at most once. */
#define EVLOOP_NONBLOCK 0x02    /**< Do not block. */
...
#define EV_TIMEOUT  0x01
#define EV_READ     0x02
#define EV_WRITE    0x04
#define EV_SIGNAL   0x08
...


1.3 timeout_correct

如果使用的是非monotonic时间,进行检查校正

static void
timeout_correct(struct event_base *base, struct timeval *tv){}


1.4 timeout_process

static void
timeout_process(struct event_base *base)
{
struct timeval now;
struct event *ev;

EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (min_heap_empty(&base->timeheap)) {
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
}
gettime(base, &now);
while ((ev = min_heap_top(&base->timeheap))) {
if (evutil_timercmp(&ev->ev_timeout, &now, >))
break;

/* delete this event from the I/O queues */
event_del_internal(ev);
event_debug(("timeout_process: call %p",ev->ev_callback));
event_active_internal(ev, EV_TIMEOUT, 1);
}
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}


3 I/O 和 Timer 事件的统一

Libevent 将 Timer 和 Signal 事件都统一到了系统的 I/O 的 demultiplex 机制中了

首先将 Timer 事件融合到系统 I/O 多路复用机制中,还是相当清晰的,因为系统的 I/O

机制像 select()和 epoll_wait()都允许程序制定一个最大等待时间(也称最大超时时间)

timeout,即使没有 I/O 事件发生,它们也保证能在 timeout 时间内返回。

那么根据所有 Timer 事件的最小超时时间来设置系统 I/O 的 timeout 时间;

当系统I/O返回时,再激活所有就绪的Timer事件就可以了,这样就能将Timer事件完美的融合到系统

的 I/O 机制中了

为什么要用堆:

libevent的时间管理数据结构之前是红黑树,红黑树的插入,删除,获取最值时间复杂度都是 O(logN),而堆获取最小key 值(小根堆)的复杂度为 O(1); libevent 采用了堆结构。(nginx用的是红黑树)

4 I/O 和 Signal 事件的统一

Signal 是异步事件的经典事例,将 Signal 事件统一到系统的 I/O 多路复用中就不像 Timer

事件那么自然了, Signal 事件的出现对于进程来讲是完全随机的,进程不能只是测试一个变

量来判别是否发生了一个信号,而是必须告诉内核“在此信号发生时,请执行如下的操作”。

如果当 Signal 发生时,并不立即调用 event 的 callback 函数处理信号,而是设法通知系

统的 I/O 机制,让其返回,然后再统一和 I/O 事件以及 Timer 一起处理,不就可以了嘛。

是的,这也是 libevent 中使用的方法。

问题的核心在于,当 Signal 发生时,如何通知系统的 I/O 多路复用机制,这里先买个小

关子,放到信号处理一节再详细说明,我想读者肯定也能想出通知的方法,比如使用 pipe。

4.0 集成策略——使用 socket pair

创建好Socket pair,为socket pair的读socket在libevent的event_base实例上注册一个persist的读事件

4.1 信号通知event_base

Libevent 会在事件主循环中检查标记,来确定是否有触发的 signal

以 Epoll 为例,在epoll_dispatch()函数中

res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-1);
}
//信号中断错误EINTR,可以忽略这种错误
evsig_process(base);
return (0);
} else if (base->sig.evsig_caught) {
evsig_process(base);
}


其中,evsig_process(base)是处理signal事件函数

4.2 信号处理

static void
evsig_handler(int sig)
{
int save_errno = errno;

if (evsig_base == NULL) {
event_warn(
"%s: received signal %d, but have no base configured",
__func__, sig);
return;
}
// 记录信号sig的触发次数,并设置event触发标记
evsig_base->sig.evsigcaught[sig]++;
evsig_base->sig.evsig_caught = 1;

#ifndef _EVENT_HAVE_SIGACTION
signal(sig, evsig_handler);//重新注册信号
#endif

/*向写socket写一个字节数据,触发event_base的I/O事件,从而通知其有信号触发,需要处理*/
send(evsig_base->sig.ev_signal_pair[0], "a", 1, 0);
errno = save_errno;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: