您的位置:首页 > 数据库 > Redis

Redis运行流程源码解析

2016-05-02 13:03 706 查看
作者:nosqlfan

本文来自@凡趣科技
pesiwang同学的投稿分享,对Redis运行流程,命令处理的内部实现进行了深入讲解。

本文分析源码基于
Redis 2.4.7 stable 版本。

概述

Redis通过定义一个 struct redisServer 类型的全局变量server 来保存服务器的相关信息(比如:配置信息,统计信息,服务器状态等等)。启动时通过读取配置文件里边的信息对server进行初始化(如果没有指定配置文件,将使用默认值对sever进行初始化),初始化的内容有:起监听端口,绑定有新连接时的回调函数,绑定服务器的定时函数,虚拟内存初始化,log初始化等等。

启动

初始化服务器配置

先来看看redis 的main函数的入口

Redis.c:1694

int main(int argc, char **argv) {
time_t start;

initServerConfig();
if (argc == 2) {
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0) usage();
resetServerSaveParams();
loadServerConfig(argv[1]);
} else if ((argc > 2)) {
usage();
} else {
...
}
if (server.daemonize) daemonize();
initServer();
...


initServerConfig初始化全局变量 server 的属性为默认值。
如果命令行指定了配置文件, resetServerSaveParams重置对落地备份的配置(即重置为默认值)并读取配置文件的内容对全局变量 server 再进行初始化 ,没有在配置文件中配置的将使用默认值。
如果服务器配置成后台执行,则对服务器进行 daemonize。
initServer初始化服务器,主要是设置信号处理函数,初始化事件轮询,起监听端口,绑定有新连接时的回调函数,绑定服务器的定时函数,初始化虚拟内存和log等等。
创建服务器监听端口。

Redis.c:923

if (server.port != 0) {
server.ipfd= anetTcpServer(server.neterr,server.port,server.bindaddr);
if (server.ipfd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening port %d: %s",
server.port, server.neterr);
exit(1);
}
}


anetTcpServer创建一个socket并进行监听,然后把返回的socket fd赋值给server.ipfd。

事件轮询结构体定义

先看看事件轮询的结构体定义

Ae.h:88

/* State of an event based program */
typedef struct aeEventLoop {
int maxfd;
long long timeEventNextId;
aeFileEvent events[AE_SETSIZE]; /* Registered events */
aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;


maxfd是最大的文件描述符,主要用来判断是否有文件事件需要处理(ae.c:293)和当使用select 来处理网络IO时作为select的参数(ae_select.c:50)。
timeEventNextId 是下一个定时事件的ID。
events[AE_SETSIZE]用于保存通过aeCreateFileEvent函数创建的文件事件,在sendReplyToClient函数和freeClient函数中通过调用aeDeleteFileEvent函数删除已经处理完的事件。
fired[AE_SETSIZE]用于保存已经触发的文件事件,在对应的网络I/O函数中进行赋值(epoll,select,kqueue),不会对fired进行删除操作,只会一直覆盖原来的值。然后在aeProcessEvents函数中对已经触发的事件进行处理。
timeEventHead 是定时事件链表的头,定时事件的存储用链表实现。
Stop 用于停止事件轮询处理。
apidata 用于保存轮询api需要的数据,即aeApiState结构体,对于epoll来说,aeApiState结构体的定义如下:
typedef struct aeApiState {
int epfd;
struct epoll_event events[AE_SETSIZE];
} aeApiState;


beforesleep 是每次进入处理事件时执行的函数。

创建事件轮询

Redis.c:920

server.el = aeCreateEventLoop();
Ae.c:55
aeEventLoop *aeCreateEventLoop(void) {
aeEventLoop *eventLoop;
int i;

eventLoop = zmalloc(sizeof(*eventLoop));
if (!eventLoop) return NULL;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) {
zfree(eventLoop);
return NULL;
}
/* Events with mask == AE_NONE are not set. So let's initialize
* the vector with it. */
for (i = 0; i < AE_SETSIZE; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
}


绑定定时函数和有新连接时的回调函数

redis.c:973

aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.ipfd > 0 &&
aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");


aeCreateTimeEvent创建定时事件并绑定回调函数serverCron,这个定时事件第一次是超过1毫秒就有权限执行,如果其他事件的处理时间比较长,可能会出现超过一定时间都没执行情况。这里的1毫秒只是超过后有可执行的权限,并不是一定会执行。第一次执行后,如果还要执行,是由定时函数的返回值确定的,在processTimeEvents(ae.c:219)中,当调用定时回调函数后,获取定时回调函数的返回值,如果返回值不等于-1,则设置定时回调函数的下一次触发时间为当前时间加上定时回调函数的返回值,即调用间隔时间。serverCron的返回值是100ms,表明从二次开始,每超过100ms就有权限执行。(定时回调函数serverCron用于更新lru时钟,更新服务器的状态,打印一些服务器信息,符合条件的情况下对hash表进行重哈希,启动后端写AOF或者检查后端写AOF或者备份是否完成,检查过期的KEY等等)
aeCreateFileEvent创建监听端口的socket fd的文件读事件(即注册网络io事件)并绑定回调函数acceptTcpHandler。

进入事件轮询

初始化后将进入事件轮询

Redis.c:1733

aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
设置每次进入事件处理前会执行的函数beforeSleep。

进入事件轮询aeMain。
退出事件轮询后删除事件轮询,释放事件轮询占用内存aeDeleteEventLoop(不过没在代码中发现有执行到这一步的可能,服务器接到shutdown命令时通过一些处理后直接就通过exit退出了,可能是我看错了,待验证)。

事件轮询函数aeMain

看看aeMain的内容

Ae.c:382

void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}


每次进入事件处理前,都会调用设置的beforesleep,beforeSleep函数主要是处理被阻塞的命令和根据配置写AOF。
aeProcessEvents处理定时事件和网络io事件。

启动完毕,等待客户端请求

到进入事件轮询函数后,redis的启动工作就做完了,接下来就是等待客户端的请求了。

接收请求

新连接到来时的回调函数

在绑定定时函数和有新连接时的回调函数中说到了绑定有新连接来时的回调函数acceptTcpHandler,现在来看看这个函数的具体内容

Networking.c:427

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[128];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);

cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd);
}


anetTcpAccept 函数 accept新连接,返回的cfd是新连接的socket fd。
acceptCommonHandler 函数是对新建立的连接进行处理,这个函数在使用 unix socket 时也会被用到。

接收客户端的新连接

接下来看看anetTcpAccept函数的具体内容

Anet.c:330
int anetTcpAccept(char *err, int s, char *ip, int *port) {
int fd;
struct sockaddr_in sa;
socklen_t salen = sizeof(sa);
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)
return ANET_ERR;

if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
if (port) *port = ntohs(sa.sin_port);
return fd;
}


再进去anetGenericAccept 看看

Anet.c:313

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
while(1) {
fd = accept(s,sa,len);
if (fd == -1) {
if (errno == EINTR)
continue;
else {
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}


anetTcpAccept 函数中调用anetGenericAccept 函数进行接收新连接,anetGenericAccept函数在 unix socket 的新连接处理中也会用到。
anetTcpAccept 函数接收新连接后,获取客户端得ip,port 并返回。

创建redisClient进行接收处理

anetTcpAccept 运行完后,返回新连接的socket fd, 然后返回到调用函数acceptTcpHandler中,继续执行acceptCommonHandler 函数

Networking.c:403

static void acceptCommonHandler(int fd) {
redisClient *c;
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
close(fd); /* May be already closed, just ingore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in nonblocking
* mode and we can send an error for free using the Kernel I/O */
if (server.maxclients && listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";

/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
freeClient(c);
return;
}
server.stat_numconnections++;
}


创建一个 redisClient 来处理新连接,每个连接都会创建一个 redisClient 来处理。
如果配置了最大并发客户端,则对现有的连接数进行检查和处理。
最后统计连接数。

绑定有数据可读时的回调函数

Networking.c:15

redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
c->bufpos = 0;

anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}

selectDb(c,0);
c->fd = fd;
c->querybuf = sdsempty();
c->reqtype = 0;
...
}


创建新连接的socket fd对应的文件读事件,绑定回调函数readQueryFromClient。
如果创建成功,则对 redisClient 进行一系列的初始化,因为 redisClient 是通用的,即不管是什么命令的请求,都是通过创建一个 redisClient 来处理的,所以会有比较多的字段需要初始化。

createClient 函数执行完后返回到调用处acceptCommonHandler函数,然后从acceptCommonHandler函数再返回到acceptTcpHandler函数。

接收请求完毕,准备接收客户端得数据

到此为止,新连接到来时的回调函数acceptTcpHandler执行完毕,在这个回调函数中创建了一个redisClient来处理这个客户端接下来的请求,并绑定了接收的新连接的读文件事件。当有数据可读时,网络i/o轮询(比如epoll)会有事件触发,此时绑定的回调函数readQueryFromClient将会调用来处理客户端发送过来的数据。

读取客户端请求的数据

在绑定有数据可读时的回调函数中的createClient函数中绑定了一个有数据可读时的回调函数readQueryFromClient函数,现在看看这个函数的具体内容

Networking.c:874

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
char buf[REDIS_IOBUF_LEN];
int nread;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);

server.current_client = c;
nread = read(fd, buf, REDIS_IOBUF_LEN);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
c->querybuf = sdscatlen(c->querybuf,buf,nread);
c->lastinteraction = time(NULL);
} else {
server.current_client = NULL;
return;
}
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = getClientInfoString(c), bytes = sdsempty();

bytes = sdscatrepr(bytes,c->querybuf,64);
redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
processInputBuffer(c);
server.current_client = NULL;
}


调用系统函数read来读取客户端传送过来的数据,调用read后对读取过程中被系统中断的情况(nread == -1 && errno == EAGAIN),客户端关闭的情况(nread == 0)进行了判断处理。
如果读取的数据超过限制(1GB)则报错。
读取完后进入processInputBuffer进行协议解析。

请求协议

从readQueryFromClient函数读取客户端传过来的数据,进入processInputBuffer函数进行协议解析,可以把processInputBuffer函数看作是输入数据的协议解析器

Networking.c:835
void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;

/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

/* Determine request type when unknown. */
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = REDIS_REQ_MULTIBULK;
} else {
c->reqtype = REDIS_REQ_INLINE;
}
}

if (c->reqtype == REDIS_REQ_INLINE) {
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
} else {
redisPanic("Unknown request type");
}

/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}


Redis支持两种协议,一种是inline,一种是multibulk。inline协议是老协议,现在一般只在命令行下的redis客户端使用,其他情况一般是使用multibulk协议。
如果客户端传送的数据的第一个字符时‘*’,那么传送数据将被当做multibulk协议处理,否则将被当做inline协议处理。Inline协议的具体解析函数是processInlineBuffer,multibulk协议的具体解析函数是processMultibulkBuffer。
当协议解析完毕,即客户端传送的数据已经解析出命令字段和参数字段,接下来进行命令处理,命令处理函数是processCommand。

Inline请求协议

Networking.c:679

int processInlineBuffer(redisClient *c) {
...
}


根据空格分割客户端传送过来的数据,把传送过来的命令和参数保存在argv数组中,把参数个数保存在argc中,argc的值包括了命令参数本身。即set key value命令,argc的值为3。详细解析见协议详解

Multibulk请求协议

Multibulk协议比inline协议复杂,它是二进制安全的,即传送数据可以包含不安全字符。Inline协议不是二进制安全的,比如,如果set key value命令中的key或value包含空白字符,那么inline协议解析时将会失败,因为解析出来的参数个数与命令需要的的参数个数会不一致。

协议格式

*<number of arguments> CR LF
$<number of bytes of argument 1> CR LF
<argument data> CR LF
...
$<number of bytes of argument N> CR LF
<argument data> CR LF

协议举例

*3
$3
SET
$5
mykey
$7
myvalue

具体解析代码位于

Networking.c:731

int processMultibulkBuffer(redisClient *c) {
...
}

详细解析见协议详解

处理命令

当协议解析完毕,则表示客户端的命令输入已经全部读取并已经解析成功,接下来就是执行客户端命令前的准备和执行客户端传送过来的命令

Redis.c:1062

/* If this function gets called we already read a whole
* command, argments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* If 1 is returned the client is still alive and valid and
* and other operations can be performed by the caller. Otherwise
* if 0 is returned the client was destroied (i.e. after QUIT). */
int processCommand(redisClient *c) {
...
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
...
call(c);
...
}


lookupCommand先根据客户端传送过来的数据查找该命令并找到命令的对应处理函数。
Call函数调用该命令函数来处理命令,命令与对应处理函数的绑定位于。

Redi.c:72

struct redisCommand *commandTable;
struct redisCommand readonlyCommandTable[] = {
{"get",getCommand,2,0,NULL,1,1,1},
...
}

回复请求

回复请求位于对应的命令中,以get命令为例

T_string.c:67

void getCommand(redisClient *c) {
getGenericCommand(c);
}

T_string.c:52
int getGenericCommand(redisClient *c) {
robj *o;

if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
return REDIS_OK;

if (o->type != REDIS_STRING) {
addReply(c,shared.wrongtypeerr);
return REDIS_ERR;
} else {
addReplyBulk(c,o);
return REDIS_OK;
}
}


getGenericCommand在getset 命令中也会用到。
lookupKeyReadOrReply是以读数据为目的查询key函数,并且如果该key不存在,则在该函数中做不存在的回包处理。
如果该key存在,则返回该key对应的数据,addReply函数以及以addReply函数开头的都是回包函数。

绑定写数据的回调函数

接下来看看addReply函数里的内容

Networking.c:190

void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return;
...
}

Networking.c:64
int _installWriteEvent(redisClient *c) {
if (c->fd <= 0) return REDIS_ERR;
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}


addReply函数一进来就先调用绑定写数据的回调函数installWriteEvent。
installWriteEvent函数中创建了一个文件写事件和绑定写事件的回调函数为sendReplyToClient。

准备写的数据内容

addReply函数一进来后就绑定写数据的回调函数,接下来就是准备写的数据内容

Networking.c:190

void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return;
redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);

/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
* refcount field of the object if it's not needed.
*
* If the encoding is RAW and there is room in the static buffer
* we'll be able to send the object to the client without
* messing with its page. */
if (obj->encoding == REDIS_ENCODING_RAW) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
} else {
/* FIXME: convert the long into string and use _addReplyToBuffer()
* instead of calling getDecodedObject. As this place in the
* code is too performance critical. */
obj = getDecodedObject(obj);
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
decrRefCount(obj);
}
}


先尝试把要返回的内容添加到发送数据缓冲区中(redisClient->buf),如果该缓冲区的大小已经放不下这次想放进去的数据,或者已经有数据在排队(redisClient->reply 链表不为空),则把数据添加到发送链表的尾部。

给客户端答复数据

在绑定写数据的回调函数中看到绑定了回调函数sendReplyToClient,现在来看看这个函数的主要内容

Networking.c:566

void sendReplyToClient(aeEventLoop *el, int fd, ...) {
...
while(c->bufpos > 0 || listLength(c->reply)) {
...
if(c->bufpos > 0){
...
nwritten=write(fd,...,c->bufpos-c->sentlen);
...
} else {
o = listNodeValue(listFirst(c->reply));
...
nwritten=write(fd,...,objlen-c->sentlen);
...
}
}
}


通过调用系统函数write给客户端发送数据,如果缓冲区有数据就把缓冲区的数据发送给客户端,缓冲区的数据发送完了,如果有排队数据,则继续发送。

退出

Redis 服务器的退出是通过shutdown命令来退出的,退出前会做一系列的清理工作

Db.c:347

void shutdownCommand(redisClient *c) {
if (prepareForShutdown() == REDIS_OK)
exit(0);
addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
}


总结

框架从启动,接收请求,读取客户端数据,请求协议解析,处理命令,回复请求,退出对redis运行的整个流程做了一个梳理。对整个redis的运作和框架有了一个初步的了解。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: