您的位置:首页 > 数据库 > Redis

结合redis设计与实现的redis源码学习-14-事件(ae.c/ae_epoll.c)

2017-11-28 01:17 645 查看
Redis服务器是一个事件驱动程序,服务器需要处理两类事件:

文件事件(file event):Redis服务器通过套接字与客户端进行连接,而文件事件就是服务器对套接字操作的抽象。服务器与客户端的通信会产生相应的文件事件,而服务器则通过监听并处理这些事件来完成一系列网络通信操作。

时间事件(time event):Redis服务器中的一些操作需要在给定的时间点执行,而时间事件就是服务器对这类定时操作的抽象。

文件事件

Redis基于Reactor模式开发了自己的网络事件处理器:这个处理器被称为文件事件处理器(file event handler):

- 1、文件时间处理器使用I/O多路复用程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器。

- 2、当贝坚挺的套接字准备好执行连接accept,read,write,close等操作时,相对应的文件事件就会产生,这好似文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。

事件类型:

- 1、当套接字变得可读时,或者有新的可应答套接字出现时,套接字产生AE_READABLE事件;

- 2、当套接字变得可写时,产生AE_WRITABLE事件。

如果一个套接字同时产生了两个事件,那么服务器会先读,后写套接字。

文件事件处理器:

- 1、连接应答处理器;

- 2、命令请求处理器;

- 3、命令回复处理器;

时间事件:

Redis的时间事件分为以下两类:

- 1、定时事件:让一段程序在指定的时间之后执行一次。

- 2、周期性时间:让一段程序每隔指定时间就执行一次。

时间事件主要由以下三个属性组成:

- 1、id:服务器为时间事件创建的全局唯一ID。ID号从小到大的顺序递增;

- 2、when:毫秒精度的时间戳,记录了时间事件到达的时间;

- 3、timProc:时间事件处理器,一个函数,当时间事件到达时,就会调用相应的处理器来处理事件。

时间事件的实现:

服务器将所有的时间事件放在一个无序链表中,每当时间事件执行器运行时,它就遍历整个链表,查找所有已到达的时间事件,并调用相应的时间事件处理器。

因为服务器暂时只有serverCron一个时间事件,在benchmark下也只使用两个时间事件,几乎是将无序链表退化成一个指针使用,所以并不影响执行的性能。

看代码:

先来看看ae_epoll.c,我比较熟悉linux下epoll的操作:

#include <sys/epoll.h>
//ae事件结构体
typedef struct aeApiState {
int epfd;//linux下用int来表示文件描述符
struct epoll_event *events;//事件结构体指针
} aeApiState;
//这里创建了epoll的句柄,并且保存到了aeEventLoop结构体中
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel 设置最大连接为1024*/
if (state->epfd == -1) {
zfree(state->events);
zfree(state)
14a3f
;
return -1;
}
eventLoop->apidata = state;
return 0;
}
//更改事件结合的大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
aeApiState *state = eventLoop->apidata;

state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
return 0;
}
//关闭epoll的句柄,释放文件事件集合
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;

close(state->epfd);
zfree(state->events);
zfree(state);
}
//给
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning 初始化避免警告*/
/* If the fd was already monitored for some event, we need a MOD operation. Otherwise we need an ADD operation. 如果一个文件描述符已经被监视了一些事件,我们需要一个MOD操作,否则我们需要ADD操作*/
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;

ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events合并旧的事件掩码 */
if (mask & AE_READABLE) ee.events |= EPOLLIN;//这个是epoll的读事件
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;//这个是epoll的写事件
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;//更改这个文件描述符的属性,失败返回
return 0;
}
//删除事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask);//要保留的事件掩码等于老的掩码与上要删除的掩码取反

ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {//这里如果掩码没有任何功能了就从epoll事件集合中删除这个文件描述符
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for EPOLL_CTL_DEL. 内核版本小于2.6.9需要非空事件指针*/
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);//从epoll集合中删除这个文件描述符
}
}
//等待文件描述符上的I/O事件
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);//等待文件描述符上的I/O事件,如果指定tvp时间那么就会超时,使用events数组接收活跃的事件
if (retval > 0) {//返回值大于0代表有事件处理
int j;

numevents = retval;//epoll_wait的返回值是有事件的描述符个数
for (j = 0; j < numevents; j++) {//遍历活跃的事件集合
int mask = 0;
struct epoll_event *e = state->events+j;//这里访问事件数组

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;将活跃的描述符放入活跃数组中
eventLoop->fired[j].mask = mask;//记录活跃描述符对应的事件
}
}
return numevents;
}
//返回使用的模块名称
static char *aeApiName(void) {
return "epoll";
}


ae.h

#ifndef __AE_H__
#define __AE_H__
#include <time.h>
/* Macros */
#define AE_NOTUSED(V) ((void) V)
struct aeEventLoop;

/* Types and data structures 这里是声明类型数据结构*/
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

/* File event structure 文件事件结构体*/
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) 是可读或者可写之一*/
aeFileProc *rfileProc;//文件事件处理器:读事件
aeFileProc *wfileProc;//文件事件处理器:写事件
void *clientData;//客户端数据
} aeFileEvent;

/* Time event structure 时间事件结构体*/
typedef struct aeTimeEvent {
long long id; /* time event identifier. 时间事件的id*/
long when_sec; /* seconds 秒*/
long when_ms; /* milliseconds 毫秒*/
aeTimeProc *timeProc;//时间事件处理器
aeEventFinalizerProc *finalizerProc;//事件结束处理器
void *clientData;//客户端数据
struct aeTimeEvent *next;//时间事件链表
} aeTimeEvent;

/* A fired event 文件事件结构体*/
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

/* State of an event based program 基于事件的程序的状态*/
typedef struct aeEventLoop {
int maxfd;   /* highest file descriptor currently registered 当前注册的文件描述符的最大值*/
int setsize; /* max number of file descriptors tracked 每次跟踪的最大文件描述符数*/
long long timeEventNextId;//时间事件id
time_t lastTime;     /* Used to detect system clock skew 用于检测系统时间偏移*/
aeFileEvent *events; /* Registered events 注册事件数组*/
aeFiredEvent *fired; /* Fired events 活跃事件数据*/
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data 这用于伦旭特定API的数据*/
aeBeforeSleepProc *beforesleep;//睡前事件
} aeEventLoop;
/* Prototypes 函数原型*/
aeEventLoop *aeCreateEventLoop(int setsize);//创建事件循环
void aeDeleteEventLoop(aeEventLoop *eventLoop);//删除事件循环
void aeStop(aeEventLoop *eventLoop);//停止事件循环
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);//将给定套接字事件加入多epoll的监听范围,并对事件和时间处理器进行关联
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);//让epoll取消对给定套接字的给定事件的监听,并取消事件和事件处理器之间的关联
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);//返回套接字正在被监听的事件类型
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);//将时间事件添加到服务器,关联事件处理器
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);//从服务器中删除id所对应的时间事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags);//文件事件分派器
int aeWait(int fd, int mask, long long milliseconds);//在给定时间内祖册并等待套接字给定类型事件的产生
void aeMain(aeEventLoop *eventLoop);//
char *aeGetApiName(void);//获取调用的模块名称
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
int aeGetSetSize(aeEventLoop *eventLoop);//获取接收活跃套接字集合大小
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);//设置接收活跃套接字集合大小

#endif


ae.c

#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>
#include <string.h>
#include <time.h>
#include <errno.h>

#include "ae.h"
#include "zmalloc.h"
#include "config.h"
#include "ae_epoll.c"//因为我读了epoll.c所以只包含它
//创建循环结构体
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;

if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;//创建epoll套接字
/* Events with mask == AE_NONE are not set. So let's initialize the vector with it. 事件掩码是NONE就是没有设置,初始化它*/
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;

err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
/* Return the current set size. 返回当前的集合大小*/
int aeGetSetSize(aeEventLoop *eventLoop) {
return eventLoop->setsize;
}
/* Resize the maximum set size of the event loop. If the requested set size is smaller than the current set size, but there is already a file descriptor in use that is >= the requested set size minus one, AE_ERR is returned and the operation is not performed at all. Otherwise AE_OK is returned and the operation is successful. 调整事件循环的最大设置大小,如果请求的设置大小小于当前设置的大小,但是已经有一个正在使用的文件描述符,返回AE_ERR,不执行操作*/
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
int i;

if (setsize == eventLoop->setsize) return AE_OK;//相等,直接返回
if (eventLoop->maxfd >= setsize) return AE_ERR;//小于,错误
if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;//设置失败,返回错误

eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
eventLoop->setsize = setsize;

/* Make sure that if we created new slots, they are initialized with an AE_NONE mask. 如果创建了新的槽,请初始化*/
for (i = eventLoop->maxfd+1; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return AE_OK;
}
//释放套接字
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
//停止循环
void aeStop(aeEventLoop *eventLoop) {
eventLoop->stop = 1;
}
//创建文件事件
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {//文件描述符大于每次接收的数组大小,错误
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];//获取当前文件描述符的事件

if (aeApiAddEvent(eventLoop, fd, mask) == -1)//将事件和文件描述符加入epoll
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;//关联事件树立起
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;//关联数据
if (fd > eventLoop->maxfd)//如果文件描述符大于最大文件描述符,将最大描述符变成当前描述符
eventLoop->maxfd = fd;
return AE_OK;
}
//删除文件事件
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
if (fd >= eventLoop->setsize) return;//文件描述符大于最大值,返回
aeFileEvent *fe = &eventLoop->events[fd];//获取当前文件描述符的事件
if (fe->mask == AE_NONE) return;//该文件描述符没有注册事件

aeApiDelEvent(eventLoop, fd, mask);//删除文件描述掩码的事件
fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd 更新最大文件描述符*/
int j;

for (j = eventLoop->maxfd-1; j >= 0; j--)//从最大描述符遍历,直到有事件的文件描述符
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}
}
//获取文件描述符事件
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
if (fd >= eventLoop->setsize) return 0;
aeFileEvent *fe = &eventLoop->events[fd];

return fe->mask;
}
//获取当前时间
static void aeGetTime(long *seconds, long *milliseconds)
{
struct timeval tv;

gettimeofday(&tv, NULL);
*seconds = tv.tv_sec;
*milliseconds = tv.tv_usec/1000;
}
//获取事件在多久后执行
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
long cur_sec, cur_ms, when_sec, when_ms;

aeGetTime(&cur_sec, &cur_ms);
when_sec = cur_sec + milliseconds/1000;
when_ms = cur_ms + milliseconds%1000;
if (when_ms >= 1000) {
when_sec ++;
when_ms -= 1000;
}
*sec = when_sec;
*ms = when_ms;
}
//创建时间事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;//获取新建时间事件的id
aeTimeEvent *te;//创建时间事件

te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);//写入将在什么时间执行
te->timeProc = proc;//时间事件处理器
te->finalizerProc = finalizerProc;//结束处理器
te->clientData = clientData;
te->next = eventLoop->timeEventHead;//头插到循环的时间事件链表
eventLoop->timeEventHead = te;
return id;
}
//删除时间事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
aeTimeEvent *te = eventLoop->timeEventHead;//获取时间事件链表
while(te) {
if (te->id == id) {
te->id = AE_DELETED_EVENT_ID;//这里直接把id变为-1就相当于删除了
return AE_OK;
}
te = te->next;
}
return AE_ERR; /* NO event with the specified ID found 如果指定ID找到的话没有事件*/
}
/* Search the first timer to fire. This operation is useful to know how many time the select can be put in sleep without to delay any event. If there are no timers NULL is returned.搜索第一个定时器,这个操作对于指导徐泽多少次可以放入睡眠而不延迟任何事件是有用的,如果没有定时器,返回NULL
* Note that's O(N) since time events are unsorted. Possible optimizations (not needed by Redis so far, but...):O(N)效率
* 1) Insert the event in order, so that the nearest is just the head. Much better but still insertion or deletion of timers is O(N).按顺序插入事件,以便最近的只是头部,删除O(N)
* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N))使用跳跃表会将此操作变为O(log(N)).
*/
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
aeTimeEvent *te = eventLoop->timeEventHead;//获取链表
aeTimeEvent *nearest = NULL;

while(te) {//遍历链表,找到事件最小的事件,每次都要遍历
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL);

/* If the system clock is moved to the future, and then set back to the right value, time events may be delayed in a random way. Often this means that scheduled operations will not be performed soon enough. Here we try to detect system clock skews, and force all the time events to be processed ASAP when this happens: the idea is that processing events earlier is less dangerous than delaying them indefinitely, and practice suggests it is. 如果系统的时钟移动到将来,然后在设置回正确的值,则事件事件可能会随机延迟,通常这意味着预定的操作不会很快执行,在这里,我们尝试检测系统时钟偏移,并在发生这种情况是强制所有的时间事件被尽快处理*/
if (now < eventLoop->lastTime) {//如果当前时间小于上一次的时间,代表表被调了
te = eventLoop->timeEventHead;
while(te) {//将所有的任务变成马上执行
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;//更改最后时间

prev = NULL;
te = eventLoop->timeEventHead;//获取链表头
maxId = eventLoop->timeEventNextId-1;//获取最大id
while(te) {
long now_sec, now_ms;
long long id;

/* Remove events scheduled for deletion. 删除预定要删除的事件*/
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}

/* Make sure we don't process time events created by time events in this iteration. Note that this check is currently useless: we always add new timers on the head, however if we change the implementation detail, this check may be useful again: we keep it here for future defense. 确保我们不会在本次迭代中处理由时间事件创建的时间事件*/
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;

id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
prev = te;
te = te->next;
}
return processed;
}
//处理每个挂起的时间事件,然后处理每个挂起的文件时缴纳,如果没有特殊标志,函数将会休眠指导某个文件事件触发,或者下一个事件发生时
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

/* Note that we want call select() even if there are no file events to process as long as we want to process time events, in order to sleep until the next time event is ready to fire. 解释没有要处理的文件事件,只要我们想要处理时间事件,就可以调用select(),知道下一次事件准备好触发为止*/
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;

if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {//如果可以获取当前最近的时间事件
long now_sec, now_ms;

aeGetTime(&now_sec, &now_ms);
tvp = &tv;

/* How many milliseconds we need to wait for the next time event to fire? 判断我们需要等多久*/
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;

if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return ASAP because of AE_DONT_WAIT we need to set the timeout to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block 其他情况需要阻塞*/
tvp = NULL; /* wait forever */
}
}

numevents = aeApiPoll(eventLoop, tvp);//获取事件个数
for (j = 0; j < numevents; j++) {//遍历处理
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;

/* note the fe->mask & mask & ... code: maybe an already processed event removed an element that fired and we still didn't processed, so we check if the event is still valid. 获取已经处理过得持剑溢出一个被触发的元素,我们仍然没有处理,所以检查事件是否仍然有效*/
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events 检查时间事件*/
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);//执行事件事件

return processed; /* return the number of processed file/time events */
}

/* Wait for milliseconds until the given file descriptor becomed writable/readable/exception 等待一定的时间处理文件描述符*/
int aeWait(int fd, int mask, long long milliseconds) {
struct pollfd pfd;
int retmask = 0, retval;

memset(&pfd, 0, sizeof(pfd));
pfd.fd = fd;
if (mask & AE_READABLE) pfd.events |= POLLIN;
if (mask & AE_WRITABLE) pfd.events |= POLLOUT;

if ((retval = poll(&pfd, 1, milliseconds))== 1) {
if (pfd.revents & POLLIN) retmask |= AE_READABLE;
if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
return retmask;
} else {
return retval;
}
}
//main函数,一个loop
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;//停止flag
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)//如果由睡眠函数,执行
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);//遍历事件处理
}
}
//获取模块名称
char *aeGetApiName(void) {
return aeApiName();
}
//设置睡眠事件处理器
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}


总结

-1、Redis服务器是一个事件驱动程序,服务器处理的事件分为时间事件和文件事件两类。

-2、文件事件处理器是基于Reactor模式实现的网络通信程序。

-3、文件事件是对套接字操作的抽象:每次套接字变得acceptable,writable,readable时,相应的文件事件就会产生。

-4、文件事件分为AE_READABLE事件和AE_WRITABLE事件两类。

-5、时间事件分为定时器事件和周期性事件:定时事件只在指定的时间到达一次,而周期性事件则每隔一段时间到达一次。

-6、服务器在一般情况下只执行serverCron函数一个时间事件,并且这个事件是周期性事件。

-7、文件事件和时间事件是合作关系,服务器会轮流处理这两种事件,并且处理事件的过程中也不会进行抢占。

-8、时间事件的实际处理时间通常会比设定的到达时间晚一些。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐