您的位置:首页 > 其它

Libevent源码分析(三)--- IO多路复用模型

2016-03-17 12:17 656 查看

eventop

和zeromq一样,libevevnt支持多种 I/O多路复用技术,如epoll、poll、dev/poll、select 和kqueue 等 ,为了统一封装这些模型,libevent定义了eventop结构体:

/** Structure to define the backend of a given event_base. */
struct eventop {
/** The name of this backend. */
const char *name;
/** Function to set up an event_base to use this backend.  It should
* create a new structure holding whatever information is needed to
* run the backend, and return it.  The returned pointer will get
* stored by event_init into the event_base.evbase field.  On failure,
* this function should return NULL. */
void *(*init)(struct event_base *);
/** Enable reading/writing on a given fd or signal.  'events' will be
* the events that we're trying to enable: one or more of EV_READ,
* EV_WRITE, EV_SIGNAL, and EV_ET.  'old' will be those events that
* were enabled on this fd previously.  'fdinfo' will be a structure
* associated with the fd by the evmap; its size is defined by the
* fdinfo field below.  It will be set to 0 the first time the fd is
* added.  The function should return 0 on success and -1 on error.
*/
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** As "add", except 'events' contains the events we mean to disable. */
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** Function to implement the core of an event loop.  It must see which
added events are ready, and cause event_active to be called for each
active event (usually via event_io_active or such).  It should
return 0 on success and -1 on error.
*/
int (*dispatch)(struct event_base *, struct timeval *);
/** Function to clean up and free our data from the event_base. */
void (*dealloc)(struct event_base *);
/** Flag: set if we need to reinitialize the event base after we fork.
*/
int need_reinit;
/** Bit-array of supported event_method_features that this backend can
* provide. */
enum event_method_feature features;
/** Length of the extra information we should record for each fd that
has one or more active events.  This information is recorded
as part of the evmap entry for each fd, and passed as an argument
to the add and del functions above.
*/
size_t fdinfo_len;
};


eventop 封装各个模型的基本操作,所有模型都要定义一个该类型结构体,将各个函数指针指向自己对应的实现。features则标记了每个模型支持的特性.可以取以下值:

enum event_method_feature {
/** Require an event method that allows edge-triggered events with EV_ET. */
EV_FEATURE_ET = 0x01,
/** Require an event method where having one event triggered among
* many is [approximately] an O(1) operation. This excludes (for
* example) select and poll, which are approximately O(N) for N
* equal to the total number of possible events. */
EV_FEATURE_O1 = 0x02,
/** Require an event method that allows file descriptors as well as
* sockets. */
EV_FEATURE_FDS = 0x04
};


EV_FEATURE_ET 标记该模型是否支持边缘触发,关于水平触发和边缘触发网上有很多资料,这里不贴了。epoll和kqueue都支持这种模式。EV_FEATURE_O1标记该模型时间复杂度是否为o(1),比如select每次返回都需要轮询所有的fd,而epoll和kequeue则不需要。最后一个EV_FEATURE_FDS 标记该模型是否支持文件类型的fd,epoll不支持EV_FEATURE_FDS。

在event_base中的前两个变量分别指向这个eventop 结构体以及该结构体指定的init返回的void*指针。

select

select模型效率比较低,但是它是最典型的也是最原始的多路复用技术,libevent实现了两个版本的select,一个是针对非windows平台的select,另一个是针对windows平台的winselect。

每一个 I/O多路复用模型都需要定义一个eventop结构体,下面是select对应的eventop :

const struct eventop selectops = {
"select",
select_init,
select_add,
select_del,
select_dispatch,
select_dealloc,
0, /* doesn't need reinit. */
EV_FEATURE_FDS,
0,
};


这个结构体指定了select的基本操作,除此之外,select还定义了另一个结构体:

struct selectop {
int event_fds;      /* Highest fd in fd set */
int event_fdsz;
int resize_out_sets;
fd_set *event_readset_in;
fd_set *event_writeset_in;
fd_set *event_readset_out;
fd_set *event_writeset_out;
};


event_base中的前两个变量第一个用来指向eventop,第二个用来指向eventop中定义的select_init的返回值,在select中就是指向一个selectop结果体:

static void * select_init(struct event_base *base)
{
struct selectop *sop;
if (!(sop = mm_calloc(1, sizeof(struct selectop))))
return (NULL);

if (select_resize(sop, SELECT_ALLOC_SIZE(32 + 1))) {
select_free_selectop(sop);
return (NULL);
}
evsig_init(base);

return (sop);
}


fd_set在不同平台上定义不一样,但是除了windows系统外基本都是一个int类型的数组,fd_set用每一位表示一个对应的套接字,即它会用数组的第一个int表示0-31的fd,用第二个表示32-63的套接字,数组默认支持1024个套接字,但是libevent为了效率,没有直接使用fd_set,而是使用了fd_set指针,但是为它内部数组分配的大小初始化是32+1的大小,这样可以节省一部分内存,当最大套接字超过当前能够表示的最大值时则需要扩容:

/* Divide positive x by y, rounding up. */
#define DIV_ROUNDUP(x, y)   (((x)+((y)-1))/(y))

/* How many bytes to allocate for N fds? */
#define SELECT_ALLOC_SIZE(n) (DIV_ROUNDUP(n, NFDBITS) * sizeof(fd_mask))

static int select_resize(struct selectop *sop, int fdsz)
{
fd_set *readset_in = NULL;
fd_set *writeset_in = NULL;

if (sop->event_readset_in)
check_selectop(sop);

if ((readset_in = mm_realloc(sop->event_readset_in, fdsz)) == NULL)
goto error;
sop->event_readset_in = readset_in;
if ((writeset_in = mm_realloc(sop->event_writeset_in, fdsz)) == NULL) {
/* Note that this will leave event_readset_in expanded.
* That's okay; we wouldn't want to free it, since that would
* change the semantics of select_resize from "expand the
* readset_in and writeset_in, or return -1" to "expand the
* *set_in members, or trash them and return -1."
*/
goto error;
}
sop->event_writeset_in = writeset_in;
sop->resize_out_sets = 1;

memset((char *)sop->event_readset_in + sop->event_fdsz, 0,
fdsz - sop->event_fdsz);
memset((char *)sop->event_writeset_in + sop->event_fdsz, 0,
fdsz - sop->event_fdsz);

sop->event_fdsz = fdsz;
check_selectop(sop);

return (0);

error:
event_warn("malloc");
return (-1);
}


这组宏定义计算对于最大值为n的fd需要申请多大字节的内存。select_resize方法则是根据当前的最大fd来进行扩容操作。这样select内的套接字的数量就可以动态改变了。

select的select_add和select_del方法比较简单,都是都过FD_SET宏向d_set中添加或者删除套接字,select_dispatch同样比较简单,为了公平,select_dispatch不会从最小的套接字开始遍历,而是从随机位置开始。

static int
select_dispatch(struct event_base *base, struct timeval *tv)
{
int res=0, i, j, nfds;
struct selectop *sop = base->evbase;

check_selectop(sop);
if (sop->resize_out_sets) {
fd_set *readset_out=NULL, *writeset_out=NULL;
size_t sz = sop->event_fdsz;
if (!(readset_out = mm_realloc(sop->event_readset_out, sz)))
return (-1);
sop->event_readset_out = readset_out;
if (!(writeset_out = mm_realloc(sop->event_writeset_out, sz))) {
/* We don't free readset_out here, since it was
* already successfully reallocated. The next time
* we call select_dispatch, the realloc will be a
* no-op. */
return (-1);
}
sop->event_writeset_out = writeset_out;
sop->resize_out_sets = 0;
}

memcpy(sop->event_readset_out, sop->event_readset_in,
sop->event_fdsz);
memcpy(sop->event_writeset_out, sop->event_writeset_in,
sop->event_fdsz);

nfds = sop->event_fds+1;

EVBASE_RELEASE_LOCK(base, th_base_lock);

res = select(nfds, sop->event_readset_out,
sop->event_writeset_out, NULL, tv);

EVBASE_ACQUIRE_LOCK(base, th_base_lock);

check_selectop(sop);

if (res == -1) {
if (errno != EINTR) {
event_warn("select");
return (-1);
}

return (0);
}

event_debug(("%s: select reports %d", __func__, res));

check_selectop(sop);
i = random() % nfds;
for (j = 0; j < nfds; ++j) {
if (++i >= nfds)
i = 0;
res = 0;
if (FD_ISSET(i, sop->event_readset_out))
res |= EV_READ;
if (FD_ISSET(i, sop->event_writeset_out))
res |= EV_WRITE;

if (res == 0)
continue;

evmap_io_active(base, i, res);
}
check_selectop(sop);

return (0);
}


evmap_io_active方法是套接字状态改变时调用的方法,在后面的章节中会详细分析。

epoll

epoll是目前linux下最好的io多路复用模型。他是对poll的改进,支持边缘触发,并且可以处理大批量fd。在zeromq的源码分析中也是用select和epoll做例子的。libevent的epoll支持changelist模式,之前提过。如果使用changelist模式,并且在两次dispatch之间分别做了一个add和一个del,则忽略这两次事件,可以减少两次系统调用。

static const struct eventop epollops_changelist = {
"epoll (with changelist)",
epoll_init,
event_changelist_add,
event_changelist_del,
epoll_dispatch,
epoll_dealloc,
1, /* need reinit */
EV_FEATURE_ET|EV_FEATURE_O1,
EVENT_CHANGELIST_FDINFO_SIZE
};

const struct eventop epollops = {
"epoll",
epoll_init,
epoll_nochangelist_add,
epoll_nochangelist_del,
epoll_dispatch,
epoll_dealloc,
1, /* need reinit */
EV_FEATURE_ET|EV_FEATURE_O1,
0
};


下面重点分析epoll (with changelist)模式的工作原理:

static void *
epoll_init(struct event_base *base)
{
int epfd;
struct epollop *epollop;

/* Initialize the kernel queue.  (The size field is ignored since
* 2.6.8.) */
if ((epfd = epoll_create(32000)) == -1) {
if (errno != ENOSYS)
event_warn("epoll_create");
return (NULL);
}

evutil_make_socket_closeonexec(epfd);

if (!(epollop = mm_calloc(1, sizeof(struct epollop)))) {
close(epfd);
return (NULL);
}

epollop->epfd = epfd;

/* Initialize fields */
epollop->events = mm_calloc(INITIAL_NEVENT, sizeof(struct epoll_event));
if (epollop->events == NULL) {
mm_free(epollop);
close(epfd);
return (NULL);
}
epollop->nevents = INITIAL_NEVENT;

if ((base->flags & EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST) != 0 ||
((base->flags & EVENT_BASE_FLAG_IGNORE_ENV) == 0 &&
evutil_getenv("EVENT_EPOLL_USE_CHANGELIST") != NULL))
base->evsel = &epollops_changelist;

evsig_init(base);

return (epollop);
}


初始化方法比较简单,主要是初始化一个epollop 结构体,创建一个epoll套接字。然后初始化INITIAL_NEVENT大小的events ,这些event用于dispatch方法中调用epoll_wait时作为参数使用。init还会检查环境设置来决定是否使用changelist模式。

epoll_nochangelist_add和epoll_nochangelist_del模式比较简单,这里主要分析event_changelist_add和event_changelist_del模式。这两个方法定义在evmap中,epoll和kequeue都需要使用这两个方法。但是epoll不使用changelist处理信号事件,而kqueue则可以使用changelist处理信号量事件,因为kqueue本身就支持信号量事件。

int
event_changelist_add(struct event_base *base, evutil_socket_t fd, short old, short events,
void *p)
{
struct event_changelist *changelist = &base->changelist;
struct event_changelist_fdinfo *fdinfo = p;
struct event_change *change;

event_changelist_check(base);

change = event_changelist_get_or_construct(changelist, fd, old, fdinfo);
if (!change)
return -1;

/* An add replaces any previous delete, but doesn't result in a no-op,
* since the delete might fail (because the fd had been closed since
* the last add, for instance. */

if (events & (EV_READ|EV_SIGNAL)) {
change->read_change = EV_CHANGE_ADD |
(events & (EV_ET|EV_PERSIST|EV_SIGNAL));
}
if (events & EV_WRITE) {
change->write_change = EV_CHANGE_ADD |
(events & (EV_ET|EV_PERSIST|EV_SIGNAL));
}

event_changelist_check(base);
return (0);
}

int
event_changelist_del(struct event_base *base, evutil_socket_t fd, short old, short events,
void *p)
{
struct event_changelist *changelist = &base->changelist;
struct event_changelist_fdinfo *fdinfo = p;
struct event_change *change;

event_changelist_check(base);
change = event_changelist_get_or_construct(changelist, fd, old, fdinfo);
event_changelist_check(base);
if (!change)
return -1;

/* A delete removes any previous add, rather than replacing it:
on those platforms where "add, delete, dispatch" is not the same
as "no-op, dispatch", we want the no-op behavior.

As well as checking the current operation we should also check
the original set of events to make sure were not ignoring
the case where the add operation is present on an event that
was already set.

If we have a no-op item, we could remove it it from the list
entirely, but really there's not much point: skipping the no-op
change when we do the dispatch later is far cheaper than rejuggling
the array now.

As this stands, it also lets through deletions of events that are
not currently set.
*/

if (events & (EV_READ|EV_SIGNAL)) {
if (!(change->old_events & (EV_READ | EV_SIGNAL)) &&
(change->read_change & EV_CHANGE_ADD))
change->read_change = 0;
else
change->read_change = EV_CHANGE_DEL;
}
if (events & EV_WRITE) {
if (!(change->old_events & EV_WRITE) &&
(change->write_change & EV_CHANGE_ADD))
change->write_change = 0;
else
change->write_change = EV_CHANGE_DEL;
}

event_changelist_check(base);
return (0);
}


这里需要注意的是event_changelist_del会把之前的event_changelist_add取消,但是event_changelist_add会替代之前的event_changelist_del。另外evmap_signal和evmap_io在初始化是都会扩展额外的数据,比如GET_IO_SLOT_AND_CTOR中的以下代码:

_ent = mm_calloc(1,sizeof(struct event_map_entry)+fdinfo_len)


fdinfo_len其实就是EVENT_CHANGELIST_FDINFO_SIZE,用来保存在changelist中的索引。每次通过evsel调用add时候,evmap_io和evmap_signal一定已经在event_signal_map和event_io_map中了,所以可以根据fd找到event_changelist_fdinfo,继而找到对应的索引:

/** Helper: return the changelist_fdinfo corresponding to a given change. */
static inline struct event_changelist_fdinfo *
event_change_get_fdinfo(struct event_base *base,
const struct event_change *change)
{
char *ptr;
if (change->read_change & EV_CHANGE_SIGNAL) {
struct evmap_signal *ctx;
GET_SIGNAL_SLOT(ctx, &base->sigmap, change->fd, evmap_signal);
ptr = ((char*)ctx) + sizeof(struct evmap_signal);
} else {
struct evmap_io *ctx;
GET_IO_SLOT(ctx, &base->io, change->fd, evmap_io);
ptr = ((char*)ctx) + sizeof(struct evmap_io);
}
return (void*)ptr;
}


每次调用dispatch时,都要把上次dispatch和本次dispatch之间所有的change都应用到epoll中。

static int epoll_apply_one_change(struct event_base *base,struct epollop *epollop,const struct event_change *ch)
{
struct epoll_event epev;
int op, events = 0;

if (1) {
/* The logic here is a little tricky.  If we had no events set
on the fd before, we need to set op="ADD" and set
events=the events we want to add.  If we had any events set
on the fd before, and we want any events to remain on the
fd, we need to say op="MOD" and set events=the events we
want to remain.  But if we want to delete the last event,
we say op="DEL" and set events=the remaining events.  What
fun!
*/

/* TODO: Turn this into a switch or a table lookup. */

if ((ch->read_change & EV_CHANGE_ADD) ||
(ch->write_change & EV_CHANGE_ADD)) {
/* If we are adding anything at all, we'll want to do
* either an ADD or a MOD. */
events = 0;
op = EPOLL_CTL_ADD;
if (ch->read_change & EV_CHANGE_ADD) {
events |= EPOLLIN;
} else if (ch->read_change & EV_CHANGE_DEL) {
;
} else if (ch->old_events & EV_READ) {
events |= EPOLLIN;
}
if (ch->write_change & EV_CHANGE_ADD) {
events |= EPOLLOUT;
} else if (ch->write_change & EV_CHANGE_DEL) {
;
} else if (ch->old_events & EV_WRITE) {
events |= EPOLLOUT;
}
if ((ch->read_change|ch->write_change) & EV_ET)
events |= EPOLLET;

if (ch->old_events) {
/* If MOD fails, we retry as an ADD, and if
* ADD fails we will retry as a MOD.  So the
* only hard part here is to guess which one
* will work.  As a heuristic, we'll try
* MOD first if we think there were old
* events and ADD if we think there were none.
*
* We can be wrong about the MOD if the file
* has in fact been closed and re-opened.
*
* We can be wrong about the ADD if the
* the fd has been re-created with a dup()
* of the same file that it was before.
*/
op = EPOLL_CTL_MOD;
}
} else if ((ch->read_change & EV_CHANGE_DEL) ||
(ch->write_change & EV_CHANGE_DEL)) {
/* If we're deleting anything, we'll want to do a MOD
* or a DEL. */
op = EPOLL_CTL_DEL;

if (ch->read_change & EV_CHANGE_DEL) {
if (ch->write_change & EV_CHANGE_DEL) {
events = EPOLLIN|EPOLLOUT;
} else if (ch->old_events & EV_WRITE) {
events = EPOLLOUT;
op = EPOLL_CTL_MOD;
} else {
events = EPOLLIN;
}
} else if (ch->write_change & EV_CHANGE_DEL) {
if (ch->old_events & EV_READ) {
events = EPOLLIN;
op = EPOLL_CTL_MOD;
} else {
events = EPOLLOUT;
}
}
}

if (!events)
return 0;

memset(&epev, 0, sizeof(epev));
epev.data.fd = ch->fd;
epev.events = events;
if (epoll_ctl(epollop->epfd, op, ch->fd, &epev) == -1) {
if (op == EPOLL_CTL_MOD && errno == ENOENT) {
/* If a MOD operation fails with ENOENT, the
* fd was probably closed and re-opened.  We
* should retry the operation as an ADD.
*/
if (epoll_ctl(epollop->epfd, EPOLL_CTL_ADD, ch->fd, &epev) == -1) {
event_warn("Epoll MOD(%d) on %d retried as ADD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
event_debug(("Epoll MOD(%d) on %d retried as ADD; succeeded.",
(int)epev.events,
ch->fd));
}
} else if (op == EPOLL_CTL_ADD && errno == EEXIST) {
/* If an ADD operation fails with EEXIST,
* either the operation was redundant (as with a
* precautionary add), or we ran into a fun
* kernel bug where using dup*() to duplicate the
* same file into the same fd gives you the same epitem
* rather than a fresh one.  For the second case,
* we must retry with MOD. */
if (epoll_ctl(epollop->epfd, EPOLL_CTL_MOD, ch->fd, &epev) == -1) {
event_warn("Epoll ADD(%d) on %d retried as MOD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
event_debug(("Epoll ADD(%d) on %d retried as MOD; succeeded.",
(int)epev.events,
ch->fd));
}
} else if (op == EPOLL_CTL_DEL &&
(errno == ENOENT || errno == EBADF ||
errno == EPERM)) {
/* If a delete fails with one of these errors,
* that's fine too: we closed the fd before we
* got around to calling epoll_dispatch. */
event_debug(("Epoll DEL(%d) on fd %d gave %s: DEL was unnecessary.",
(int)epev.events,
ch->fd,
strerror(errno)));
} else {
event_warn("Epoll %s(%d) on fd %d failed.  Old events were %d; read change was %d (%s); write change was %d (%s)",
epoll_op_to_string(op),
(int)epev.events,
ch->fd,
ch->old_events,
ch->read_change,
change_to_string(ch->read_change),
ch->write_change,
change_to_string(ch->write_change));
return -1;
}
} else {
event_debug(("Epoll %s(%d) on fd %d okay. [old events were %d; read change was %d; write change was %d]",
epoll_op_to_string(op),
(int)epev.events,
(int)ch->fd,
ch->old_events,
ch->read_change,
ch->write_change));
}
}
return 0;
}


这里需要特殊处理的是EPOLL_CTL_ADD操作如果失败并且返回EEXIST错误则需要转用EPOLL_CTL_MOD,如果使用EPOLL_CTL_MOD失败并且错误码是ENOENT则需要使用EPOLL_CTL_ADD处理。另外如果设置了边缘触发模式,则需要添加EPOLLET标记。

static int
epoll_dispatch(struct event_base *base, struct timeval *tv)
{
struct epollop *epollop = base->evbase;
struct epoll_event *events = epollop->events;
int i, res;
long timeout = -1;

if (tv != NULL) {
timeout = evutil_tv_to_msec(tv);
if (timeout < 0 || timeout > MAX_EPOLL_TIMEOUT_MSEC) {
/* Linux kernels can wait forever if the timeout is
* too big; see comment on MAX_EPOLL_TIMEOUT_MSEC. */
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}
}

epoll_apply_changes(base);
event_changelist_remove_all(&base->changelist, base);

EVBASE_RELEASE_LOCK(base, th_base_lock);

res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);

EVBASE_ACQUIRE_LOCK(base, th_base_lock);

if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-1);
}

return (0);
}

event_debug(("%s: epoll_wait reports %d", __func__, res));
EVUTIL_ASSERT(res <= epollop->nevents);

for (i = 0; i < res; i++) {
int what = events[i].events;
short ev = 0;

if (what & (EPOLLHUP|EPOLLERR)) {
ev = EV_READ | EV_WRITE;
} else {
if (what & EPOLLIN)
ev |= EV_READ;
if (what & EPOLLOUT)
ev |= EV_WRITE;
}

if (!ev)
continue;

evmap_io_active(base, events[i].data.fd, ev | EV_ET);
}

if (res == epollop->nevents && epollop->nevents < MAX_NEVENT) {
/* We used all of the event space this time.  We should
be ready for more events next time. */
int new_nevents = epollop->nevents * 2;
struct epoll_event *new_events;

new_events = mm_realloc(epollop->events,
new_nevents * sizeof(struct epoll_event));
if (new_events) {
epollop->events = new_events;
epollop->nevents = new_nevents;
}
}

return (0);
}


epoll_dispatch比较简单,首先把所有的change都应用到 epoll中,然后清空changlist。epoll_wait只会返回有事件的events,不需要象select一样轮询,这也是epoll比select高效的原因之一。注意当返回的event数量和等于epollop->nevents时说明可能会有更多的event有事件,此时需要为epollop的events扩容,以减少epoll_wait的系统调用。

iocp

iocp是windows上效率最高的通讯模型,zeromq没有实现它,但是libevent实现了,不过是结合buffer_event实现的,所以iocp的分析将在buffer_event章节一起介绍。

signal

select和epoll在init方法中都调用了一个evsig_init方法,因为select和epoll本身不处理信号量事件,所以需要使用signal.c文件中的信号量处理方法。evsignal-internal.h中定义了一个结构体:

struct evsig_info {
/* Event watching ev_signal_pair[1] */
struct event ev_signal;
/* Socketpair used to send notifications from the signal handler */
evutil_socket_t ev_signal_pair[2];
/* True iff we've added the ev_signal event yet. */
int ev_signal_added;
/* Count of the number of signals we're currently watching. */
int ev_n_signals_added;

/* Array of previous signal handler objects before Libevent started
* messing with them.  Used to restore old signal handlers. */
#ifdef _EVENT_HAVE_SIGACTION
struct sigaction **sh_old;
#else
ev_sighandler_t **sh_old;
#endif
/* Size of sh_old. */
int sh_old_max;
};


ev_signal是用于监听socketpari中ev_signal_pair[1],ev_signal_pair是一个socketpair,在zeromq中同样使用过socketpair,libevent中socketpari的创建和zeromq比较类似。ev_signal_added用于标记ev_signal是否已经添加到event_base中。ev_n_signals_added用于标记event_base监听信号量的数目。sh_old用于存储之前信号量的默认处理方法,当libevent取消对某一个信号量的监听时需要恢复回默认的处理方式。

与select和epoll相同,singal.c文件同样定义了一个eventop结果体,并且在evsig_init中将base->evsigsel指向该结构体:

static const struct eventop evsigops = {
"signal",
NULL,
evsig_add,
evsig_del,
NULL,
NULL,
0, 0, 0
};

int evsig_init(struct event_base *base)
{
/*
* Our signal handler is going to write to one end of the socket
* pair to wake up our event loop.  The event loop then scans for
* signals that got delivered.
*/
if (evutil_socketpair(
AF_UNIX, SOCK_STREAM, 0, base->sig.ev_signal_pair) == -1) {
#ifdef WIN32
/* Make this nonfatal on win32, where sometimes people
have localhost firewalled. */
event_sock_warn(-1, "%s: socketpair", __func__);
#else
event_sock_err(1, -1, "%s: socketpair", __func__);
#endif
return -1;
}

evutil_make_socket_closeonexec(base->sig.ev_signal_pair[0]);
evutil_make_socket_closeonexec(base->sig.ev_signal_pair[1]);
base->sig.sh_old = NULL;
base->sig.sh_old_max = 0;

evutil_make_socket_nonblocking(base->sig.ev_signal_pair[0]);
evutil_make_socket_nonblocking(base->sig.ev_signal_pair[1]);

event_assign(&base->sig.ev_signal, base, base->sig.ev_signal_pair[1],
EV_READ | EV_PERSIST, evsig_cb, base);

base->sig.ev_signal.ev_flags |= EVLIST_INTERNAL;
event_priority_set(&base->sig.ev_signal, 0);

base->evsigsel = &evsigops;

return 0;
}


evsig_add用于监听一个信号量:

static int evsig_add(struct event_base *base, evutil_socket_t evsignal, short old, short events, void *p)
{
struct evsig_info *sig = &base->sig;
(void)p;

EVUTIL_ASSERT(evsignal >= 0 && evsignal < NSIG);

/* catch signals if they happen quickly */
EVSIGBASE_LOCK();
if (evsig_base != base && evsig_base_n_signals_added) {
event_warnx("Added a signal to event base %p with signals "
"already added to event_base %p.  Only one can have "
"signals at a time with the %s backend.  The base with "
"the most recently added signal or the most recent "
"event_base_loop() call gets preference; do "
"not rely on this behavior in future Libevent versions.",
base, evsig_base, base->evsel->name);
}
evsig_base = base;
evsig_base_n_signals_added = ++sig->ev_n_signals_added;
evsig_base_fd = base->sig.ev_signal_pair[0];
EVSIGBASE_UNLOCK();

event_debug(("%s: %d: changing signal handler", __func__, (int)evsignal));
if (_evsig_set_handler(base, (int)evsignal, evsig_handler) == -1) {
goto err;
}

if (!sig->ev_signal_added) {
if (event_add(&sig->ev_signal, NULL))
goto err;
sig->ev_signal_added = 1;
}

return (0);

err:
EVSIGBASE_LOCK();
--evsig_base_n_signals_added;
--sig->ev_n_signals_added;
EVSIGBASE_UNLOCK();
return (-1);
}


其中_evsig_set_handler方法会设置当前的信号量的处理函数为evsig_handler并且保存旧的处理方式。如果ev_signal没有添加到event_base中则应该添加以便监听ev_signal_pair[1],这样当有事件发生时evsig_handler就会背调用:

static void __cdecl
evsig_handler(int sig)
{
int save_errno = errno;
#ifdef WIN32
int socket_errno = EVUTIL_SOCKET_ERROR();
#endif
ev_uint8_t msg;

if (evsig_base == NULL) {
event_warnx(
"%s: received signal %d, but have no base configured",
__func__, sig);
return;
}

#ifndef _EVENT_HAVE_SIGACTION
signal(sig, evsig_handler);
#endif

/* Wake up our notification mechanism */
msg = sig;
send(evsig_base_fd, (char*)&msg, 1, 0);
errno = save_errno;
#ifdef WIN32
EVUTIL_SET_SOCKET_ERROR(socket_errno);
#endif
}


evsig_handler会把接收到的信号量事件通过socket_pair发送给ev_signal_pair[1],而ev_signal_pair[1]对应的event已经被添加到event_base中,所以此时evsig_cb会被调用:

static void evsig_cb(evutil_socket_t fd, short what, void *arg)
{
static char signals[1024];
ev_ssize_t n;
int i;
int ncaught[NSIG];
struct event_base *base;

base = arg;

memset(&ncaught, 0, sizeof(ncaught));

while (1) {
n = recv(fd, signals, sizeof(signals), 0);
if (n == -1) {
int err = evutil_socket_geterror(fd);
if (! EVUTIL_ERR_RW_RETRIABLE(err))
event_sock_err(1, fd, "%s: recv", __func__);
break;
} else if (n == 0) {
/* XXX warn? */
break;
}
for (i = 0; i < n; ++i) {
ev_uint8_t sig = signals[i];
if (sig < NSIG)
ncaught[sig]++;
}
}

EVBASE_ACQUIRE_LOCK(base, th_base_lock);
for (i = 0; i < NSIG; ++i) {
if (ncaught[i])
evmap_signal_active(base, i, ncaught[i]);
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
}


evsig_cb会调用evmap_signal_active把所有注册的signal事件变成激活状态,这样信号量时间也被集成到event中统一处理了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: