libevent札记 - libevent是怎么开始工作的
2017-08-22 13:48
393 查看
原文链接
了解过异步模式的应该知道,异步模式一般也都有回调函数,但在这里,Reactor不是异步的,Reactor更像是一种同步模式框架,尽管可以同时接收多个服务请求,并且依次同步的处理它们的事件驱动程序,实际上数据拷贝还是阻塞的
由于Reactor模式不是异步的,所以可以预见,如果Reactor处理耗时长的操作会造成事件分发的阻塞,影响到后续事件的处理;(Proactor是异步的)
注释:
event_sigcb与event_gotsig
为了避免信号竞争,事件API提供了两程变量:event_sigcb 和 event_gotsig.
某个信号的句柄设置event_gotsig表示收到信号
应用程序把event_sigcb设置成一个回调函数.当信号句柄设置了
event_gotsig之后,event_dispatch函数会执行回调函数处理接收到的信号.
当没有事件注册时回调函数返回1.
回调函数可以返回-1表示错误,这将导致event_dispatch()结束,错误代码为EINTR.
1.2 标志符的说明
在event.h文件中,进行相应的标志符定义
1.3 timeout_correct
如果使用的是非monotonic时间,进行检查校正
1.4 timeout_process
首先将 Timer 事件融合到系统 I/O 多路复用机制中,还是相当清晰的,因为系统的 I/O
机制像 select()和 epoll_wait()都允许程序制定一个最大等待时间(也称最大超时时间)
timeout,即使没有 I/O 事件发生,它们也保证能在 timeout 时间内返回。
那么根据所有 Timer 事件的最小超时时间来设置系统 I/O 的 timeout 时间;
当系统I/O返回时,再激活所有就绪的Timer事件就可以了,这样就能将Timer事件完美的融合到系统
的 I/O 机制中了
为什么要用堆:
libevent的时间管理数据结构之前是红黑树,红黑树的插入,删除,获取最值时间复杂度都是 O(logN),而堆获取最小key 值(小根堆)的复杂度为 O(1); libevent 采用了堆结构。(nginx用的是红黑树)
事件那么自然了, Signal 事件的出现对于进程来讲是完全随机的,进程不能只是测试一个变
量来判别是否发生了一个信号,而是必须告诉内核“在此信号发生时,请执行如下的操作”。
如果当 Signal 发生时,并不立即调用 event 的 callback 函数处理信号,而是设法通知系
统的 I/O 机制,让其返回,然后再统一和 I/O 事件以及 Timer 一起处理,不就可以了嘛。
是的,这也是 libevent 中使用的方法。
问题的核心在于,当 Signal 发生时,如何通知系统的 I/O 多路复用机制,这里先买个小
关子,放到信号处理一节再详细说明,我想读者肯定也能想出通知的方法,比如使用 pipe。
以 Epoll 为例,在epoll_dispatch()函数中
其中,evsig_process(base)是处理signal事件函数
1.Reactor模式
libevent采用的是Reactor模式,一种事件列表机制,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。了解过异步模式的应该知道,异步模式一般也都有回调函数,但在这里,Reactor不是异步的,Reactor更像是一种同步模式框架,尽管可以同时接收多个服务请求,并且依次同步的处理它们的事件驱动程序,实际上数据拷贝还是阻塞的
由于Reactor模式不是异步的,所以可以预见,如果Reactor处理耗时长的操作会造成事件分发的阻塞,影响到后续事件的处理;(Proactor是异步的)
2.主循环
2.1 主循环函数event_base_loop(current_base, flags);
event_dispatch-->event_base_loop int event_base_loop(struct event_base *base, int flags) { const struct eventop *evsel = base->evsel; struct timeval tv; struct timeval *tv_p; int res, done; /* clear time cache */ base->tv_cache.tv_sec = 0; if (base->sig.ev_signal_added) evsig_base = base; //处理signal时,指名所属的base实例 done = 0; #ifndef _EVENT_DISABLE_THREAD_SUPPORT base->th_owner_id = EVTHREAD_GET_ID(); #endif while (!done) { /* Terminate the loop if we have been asked to */ if (base->event_gotterm) { base->event_gotterm = 0; break; } if (base->event_break) { base->event_break = 0; break; } /* You cannot use this interface for multi-threaded apps */ while (event_gotsig) { event_gotsig = 0; if (event_sigcb) { res = (*event_sigcb)(); //信号处理回调 if (res == -1) { errno = EINTR; //错误代码,信号中断 return (-1); } } } //校正系统时间 timeout_correct(base, &tv); // 根据 timer heap中事件的最小超时时间,计算系统I/O demultiplexer 的最大等待时间 tv_p = &tv; if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) { timeout_next(base, &tv_p); } else { /* 依然有未处理的就绪时间,就让I/O demultiplexer立即返回 * if we have active events, we just poll new events * without waiting. */ evutil_timerclear(&tv); } /* 如果当前没有注册事件,就退出 If we have no events, we just exit */ if d916 (!event_haveevents(base) && !base->event_count_active) { event_debug(("%s: no events registered.", __func__)); return (1); } /* 更新last wait time,update last old time */ gettime(base, &base->event_tv); /* clear time cache */ base->tv_cache.tv_sec = 0; // 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等; // 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中 res = evsel->dispatch(base, tv_p); if (res == -1) return (-1); gettime(base, &base->tv_cache);// 将time cache赋值为当前系统时间 timeout_process(base);/*检查heap中的timer events,将就绪的timer event从heap上删除 并插入到激活链表中*/ // 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表,处理就绪事件 if (base->event_count_active) { event_process_active(base); //处理激活链表中的就绪event if (!base->event_count_active && (flags & EVLOOP_ONCE)) done = 1; } else if (flags & EVLOOP_NONBLOCK) done = 1; } /* clear time cache */ base->tv_cache.tv_sec = 0; event_debug(("%s: asked to terminate loop.", __func__)); return (0); }
注释:
event_sigcb与event_gotsig
为了避免信号竞争,事件API提供了两程变量:event_sigcb 和 event_gotsig.
某个信号的句柄设置event_gotsig表示收到信号
应用程序把event_sigcb设置成一个回调函数.当信号句柄设置了
event_gotsig之后,event_dispatch函数会执行回调函数处理接收到的信号.
当没有事件注册时回调函数返回1.
回调函数可以返回-1表示错误,这将导致event_dispatch()结束,错误代码为EINTR.
1.2 标志符的说明
在event.h文件中,进行相应的标志符定义
#define EVLOOP_ONCE 0x01 /**< Block at most once. */ #define EVLOOP_NONBLOCK 0x02 /**< Do not block. */ ... #define EV_TIMEOUT 0x01 #define EV_READ 0x02 #define EV_WRITE 0x04 #define EV_SIGNAL 0x08 ...
1.3 timeout_correct
如果使用的是非monotonic时间,进行检查校正
static void timeout_correct(struct event_base *base, struct timeval *tv){}
1.4 timeout_process
static void timeout_process(struct event_base *base) { struct timeval now; struct event *ev; EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); if (min_heap_empty(&base->timeheap)) { EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); return; } gettime(base, &now); while ((ev = min_heap_top(&base->timeheap))) { if (evutil_timercmp(&ev->ev_timeout, &now, >)) break; /* delete this event from the I/O queues */ event_del_internal(ev); event_debug(("timeout_process: call %p",ev->ev_callback)); event_active_internal(ev, EV_TIMEOUT, 1); } EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); }
3 I/O 和 Timer 事件的统一
Libevent 将 Timer 和 Signal 事件都统一到了系统的 I/O 的 demultiplex 机制中了首先将 Timer 事件融合到系统 I/O 多路复用机制中,还是相当清晰的,因为系统的 I/O
机制像 select()和 epoll_wait()都允许程序制定一个最大等待时间(也称最大超时时间)
timeout,即使没有 I/O 事件发生,它们也保证能在 timeout 时间内返回。
那么根据所有 Timer 事件的最小超时时间来设置系统 I/O 的 timeout 时间;
当系统I/O返回时,再激活所有就绪的Timer事件就可以了,这样就能将Timer事件完美的融合到系统
的 I/O 机制中了
为什么要用堆:
libevent的时间管理数据结构之前是红黑树,红黑树的插入,删除,获取最值时间复杂度都是 O(logN),而堆获取最小key 值(小根堆)的复杂度为 O(1); libevent 采用了堆结构。(nginx用的是红黑树)
4 I/O 和 Signal 事件的统一
Signal 是异步事件的经典事例,将 Signal 事件统一到系统的 I/O 多路复用中就不像 Timer事件那么自然了, Signal 事件的出现对于进程来讲是完全随机的,进程不能只是测试一个变
量来判别是否发生了一个信号,而是必须告诉内核“在此信号发生时,请执行如下的操作”。
如果当 Signal 发生时,并不立即调用 event 的 callback 函数处理信号,而是设法通知系
统的 I/O 机制,让其返回,然后再统一和 I/O 事件以及 Timer 一起处理,不就可以了嘛。
是的,这也是 libevent 中使用的方法。
问题的核心在于,当 Signal 发生时,如何通知系统的 I/O 多路复用机制,这里先买个小
关子,放到信号处理一节再详细说明,我想读者肯定也能想出通知的方法,比如使用 pipe。
4.0 集成策略——使用 socket pair
创建好Socket pair,为socket pair的读socket在libevent的event_base实例上注册一个persist的读事件4.1 信号通知event_base
Libevent 会在事件主循环中检查标记,来确定是否有触发的 signal以 Epoll 为例,在epoll_dispatch()函数中
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); if (res == -1) { if (errno != EINTR) { event_warn("epoll_wait"); return (-1); } //信号中断错误EINTR,可以忽略这种错误 evsig_process(base); return (0); } else if (base->sig.evsig_caught) { evsig_process(base); }
其中,evsig_process(base)是处理signal事件函数
4.2 信号处理
static void evsig_handler(int sig) { int save_errno = errno; if (evsig_base == NULL) { event_warn( "%s: received signal %d, but have no base configured", __func__, sig); return; } // 记录信号sig的触发次数,并设置event触发标记 evsig_base->sig.evsigcaught[sig]++; evsig_base->sig.evsig_caught = 1; #ifndef _EVENT_HAVE_SIGACTION signal(sig, evsig_handler);//重新注册信号 #endif /*向写socket写一个字节数据,触发event_base的I/O事件,从而通知其有信号触发,需要处理*/ send(evsig_base->sig.ev_signal_pair[0], "a", 1, 0); errno = save_errno; }
相关文章推荐
- 开始和终止工作英语怎么说?
- 自从来到了上海,开始工作以来就没怎么到博客园
- 又是一个星期天,明天又要开始一周的工作了,想想上周的工作情况,不怎么理想。
- 从今天起,开始认真地记录工作与生活
- NTFS文件系统是怎么工作的
- 春节啦!程序员怎么向亲戚介绍你的工作?
- Silverlight 2 教程(一):开始学习Silverlight应用程序的准备工作
- 工作不顺心怎么办?
- 操作系统是怎么工作的——mykernel环境的搭建
- 自己学Docker:4.开始了解Docker的工作模式
- 开始写点东西吧,为我的工作。
- Bug人生--工作一年,情感札记
- centos学习:shell开始 批处理?777权限? rwx怎么控制
- malloc和free是怎么工作的。
- 我们怎么做才能进入Google工作
- 开始工作了
- 笔记本开机只有电池灯亮还可以听见工作声音就是屏幕也没有反应是怎么回事
- 工作周记 - 第九周 (2016/07/18 - 2016/07/22) 这么快就结束了吗?这才真正开始呢!
- 1181链表最好先插入再排序,刚开始边插入边排序不知道怎么出现了错误,以后尽量插入排序分开吧
- windows7系统中怎么隐藏桌面图标提高工作效率保持桌面整洁