您的位置:首页 > 数据库 > Redis

Redis 源码分析系列1-main函数相关调用分析

2015-07-19 21:12 686 查看
从main函数切入,方便从宏观上掌握redis的运作机制,本篇就从main函数入手,从最上层看,main调用了哪些接口,具体完成了什么功能,然后再聚焦具体的模块。

aeEventLoop是Redis的事件核心数据结构,Redis将aeEventLoop不同平台上的多路分离器进行适配,如select/kqueue/epoll。

为了跨平台,aeEventLoop中定义了void* apidata这一结构,用来有不同平台的分离器进行关联。

typedef struct aeEventLoop {
// 目前已注册的最大描述符
int maxfd;   /* highest file descriptor currently registered */
// 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime;     /* Used to detect system clock skew */
// 已注册的文件事件
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */
// 时间事件
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;


其中select的apidata结构为:

typedef struct aeApiState {
fd_set rfds, wfds;
/* We need to have a copy of the fd sets as it's not safe to reuse
* FD sets after select(). */
fd_set _rfds, _wfds;
} aeApiState;


这里提到额外引入了_rfds,_wfds,是读写句柄集合的拷贝,原因是:select调用完成后,重用fd集合不是安全的(具体原因可以看select函数解析)。

在创建eventloop的时候,对其中eventloop->events数组中的每个event(即eventloop->events[fd])的mask都预置位AE_NONE;

/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;


当第一个事件来临时候,

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;

memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];

if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}


这里对fe->mask进行了判断,如果为AE_NONE,则说明此fd没有可读或者可写事件,跳过本次for循环。

否则分别判断为可读还是可写,并构造fe的mask。最后将此事件关联的fd(即这里的j)添加至fired数组,其为所有已经就绪的fd数组。

显然可知:aeApiPoll的作用是利用多路分离器,复用IO,判断哪些fd可读写,并加入到已就绪数组中,等待被处理

那么aeApiPoll什么时候会被调用呢?

在函数aeProcessEvents中调用,获取已经就绪的fd,完成读写IO。

调用处的关键代码如下:

numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;

/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
// 读事件
rfired = 1; // 确保读/写事件只能执行其中一个
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
// 写事件
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}

processed++;


简单分析下具体细节:

// 计算tvp(即多路分离器的timeout,超时时间,对于select的最后一个参数来说,如果为NULL,那么select会阻塞直到有句柄可读写为止)

调用numevents=aeApiPoll(eventloop, tvp),返回可读写的事件个数,遍历eventloop->events数组,此数组以fd作为下标索引事件,而fd则存储在eventloop->fired数组中。

因此从numevents到fe的索引逻辑如下图:



事件可读,则对调用fe->rfileProc;事件可写,那么调用fe->wfileProc进行IO写。而fe->rfileProc,fe->wfileProc的初始化则在initServer中;

initServer的主要代码如下:

// 注册新号处理函数,对于SIGHUP,SIGPIPE,忽略。
// 初始化共享对象

// 关键,初始化redis的事件循环主结构
server.el = aeCreateEventLoop(server.maxclients+1024);

// 初始化作为server的网络连接
server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);

//初始化db

// 初始化pubsub

//关键,关联时间时间
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);

// 将前面创建的server socket即server.ipfd与redis的事件循环关联。
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");

//初始化bio


其中关键的函数为aeCreateFileEvent,这里将server.ipfd于server.el进行关联,而acceptTcpHandler将用来初始化fe->rfileProc,以及fe->wfileProc。

进入aeCreateFileEvent函数中,此函数比较重要,我们逐行分析:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) return AE_ERR;
aeFileEvent *fe = &eventLoop->events[fd];

// 监听指定 fd
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;

// 设置文件事件类型
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;

fe->clientData = clientData;

// 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;

return AE_OK;
}


上面对此调用aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL)

其中eventloop为redis核心事件循环,fd为server.ipfd,mask为AE_READABLE,proc=acceptTcpHandler,即fe->rfileProc以及fe->wfileProc均为acceptTcpHandler,

clientdata为NULL。

总结为:server端建立服务连接后,用于接收client响应的socket即server.ipfd(用socket(AF_INET,SOCK_STREAM,0)创建出socket),创建的socket监听读事件,对应的事件处理函数为acceptTcpHandler。

查看acceptTcpHandler函数代码:

// 其中调用accept,创建cfd服务客户端
cfd = anetTcpAccept(server.neterr, fd, cip, &cport)

// 为cfd创建redisClient,获取客户端查询命令
// 调用readQueryFromClient()获取客户端buffer。
acceptCommandHandler(cfd,0)


cfd为来自客户端的响应创建的socket,为此客户端服务。在acceptCommandHandler中为cfd创建redisclient。关键代码为:

static void acceptCommonHandler(int fd, int flags) {
redisClient *c;

// 创建新客户端
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resources for the client");
close(fd); /* May be already closed, just ignore errors */
return;
}

/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in nonblocking
* mode and we can send an error for free using the Kernel I/O */

if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";

/* That's a best effort error message, don't check write errors */
// 发送错误信息到客户端
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
// 释放客户端
freeClient(c);
return;
}
server.stat_numconnections++;
c->flags |= flags;
}


其中调用createClient为cfd这个句柄创建redisClient,在createClient中对socket设置了O_NONBLOCK以及TCP_NODELAY属性,即非阻塞及不延迟模式。

同时在此函数为cfd添加读事件,监听来自客户端的命令请求。具体的细节,在后面系列中再深入剖析。

对系列1进行总结:从redis.c的main函数入口,做必要的初始化、配置读取、创建redis的核心事件循环eventloop、创建server网络连接,并将事件循环与server的网络连接进行关联,并基于不同平台的IO多路复用机制实现redis自己的多路分离机制。

基于此,可以绘制出易于理解的思维导图:



小旗子中标出的为重点理解部分。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: