您的位置:首页 > 数据库 > Redis

Redis源码简要分析

2014-11-16 08:06 281 查看
在文章的开头我们把所有服务端文件列出来,并且标示出其作用:

adlist.c //双向链表

ae.c //事件驱动

ae_epoll.c //epoll接口, linux用

ae_kqueue.c //kqueue接口, freebsd用

ae_select.c //select接口, windows用

anet.c //网络处理

aof.c //处理AOF文件

config.c //配置文件解析

db.c //DB处理

dict.c //hash表

intset.c //转换为数字类型数据

multi.c //事务,多条命令一起打包处理

networking.c //读取、解析和处理客户端命令

object.c //各种对像的创建与销毁,string、list、set、zset、hash

rdb.c //redis数据文件处理

redis.c //程序主要文件

replication.c //数据同步master-slave

sds.c //字符串处理

sort.c //用于list、set、zset排序

t_hash.c //hash类型处理

t_list.c //list类型处理

t_set.c //set类型处理

t_string.c //string类型处理

t_zset.c //zset类型处理

ziplist.c //节省内存方式的list处理

zipmap.c //节省内存方式的hash处理

zmalloc.c //内存管理
上面基本是redis最主要的处理文件,部分没有列出来,如VM之类的,就不在这里讲了。
首先我们来回顾一下redis的一些基本知识:

1、redis有N个DB(默认为16个DB),并且每个db有一个hash表负责存放key,同一个DB不能有相同的KEY,但是不同的DB可以相同的KEY;

2、支持的几种数据类型:string、hash、list、set、zset;

3、redis可以使用aof来保存写操作日志(也可以使用快照方式保存数据文件)

对于数据类型在这里简单的介绍一下(网上有图,下面我贴上图片可能更容易理解)

1、对于一个string对像,直接存储内容;

2、对于一个hash对像,当成员数量少于512的时候使用zipmap(一种很省内存的方式实现hash table),反之使用hash表(key存储成员名,value存储成员数据);

3、对于一个list对像,当成员数量少于512的时候使用ziplist(一种很省内存的方式实现list),反之使用双向链表(list);

4、对于一个set对像,使用hash表(key存储数据,内容为空)

5、对于一个zset对像,使用跳表(skip list),关于跳表的相关内容可以查看本blog的跳表学习笔记;



下面正式进入源代码的分析

1、首先是初始化配置,initServerConfig(redis.c:759)

void initServerConfig() {

server.port = REDIS_SERVERPORT;

server.bindaddr = NULL;

server.unixsocket = NULL;

server.ipfd = -1;

server.sofd = -1;

2.在初始化配置中调用了populateCommandTable(redis.c:925)函数,该函数的目地是将命令集分布到一个hash table中,大家可以看到每一个命令都对应一个处理函数,因为redis支持的命令集还是蛮多,所以如果要靠if分支来做命令处理的话即繁琐效率还底,
因此放到hash table中,在理想的情况下只需一次就能定位命令的处理函数。

void populateCommandTable(void) {

int j;

int numcommands = sizeof(readonlyCommandTable)/sizeof(struct redisCommand);

for (j = 0; j < numcommands; j++) {

struct redisCommand *c = readonlyCommandTable+j;

int retval;

retval = dictAdd(server.commands, sdsnew(c->name), c);

assert(retval == DICT_OK);

}

}

3、对参数的解析,redis-server有一个参数(可以不需要),这个参数是指定配置文件路径,然后由函数loadServerConfig(config.c:28)加载所有配置

if (argc == 2) {

if (strcmp(argv[1], “-v”) == 0 ||

strcmp(argv[1], “–version”) == 0) version();

if (strcmp(argv[1], “–help”) == 0) usage();

resetServerSaveParams();

loadServerConfig(argv[1]);

4、初始化服务器initServer(redis.c:836), 该函数初始化一些服务器信息,包括创建事件处理对像、db、socket、客户端链表、公共字符串等。

void initServer() {

int j;

signal(SIGHUP, SIG_IGN);

signal(SIGPIPE, SIG_IGN);

setupSignalHandlers();//设置信号处理

if (server.syslog_enabled) {

openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,

server.syslog_facility);

}

5、在上面初始化服务器中有一段代码是创建事件驱动,aeCreateTimeEvent是创建一个定时器,下面创建的定时器将会每毫秒调用 serverCron函数,而aeCreateFileEvent是创建网络事件驱动,当server.ipfd和server.sofd有数据可读的情
况将会分别调用函数acceptTcpHandler和acceptUnixHandler。

aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);

if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,

acceptTcpHandler,NULL) == AE_ERR) oom(“creating file event”);

if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,

acceptUnixHandler,NULL) == AE_ERR) oom(“creating file event”);

6、接下来就是初始化数据,如果开启了AOF,那么会调用loadAppendOnlyFile(aof.c:216)去加载AOF文件,在AOF 文件中存放了客户端的命令,函数将数据读取出来然后依次去调用命令集去处理,当AOF文件很大的时候势必为影响客户端的请求,所以每处理1000条命令就
会去尝试接受和处理客户端的请求,其代码在aof.c第250行; 但是如果没有开启AOF并且有rdb的情况,会调用rdbLoad(redis.c:873)尝试去加载rdb文件,理所当然的在加载rdb文件的内部也
会考虑文件太大而影响客户端请求,所以跟AOF一样,每处理1000条也会尝试去接受和处理客户端请求。

7、当所有初始化工作做完之后,服务端就开始正式工作了

aeSetBeforeSleepProc(server.el,beforeSleep);

aeMain(server.el);

8、大家都知道redis是单线程模式,所有的请求、处理都是在同一个线程里面进行,也就是一个无限循环,在这个无限循环的内部有两件事要做,第一 件就是调用通过aeSetBeforeSleepProc函数设置的回调函数,第二件就是开始接受客户端的请求和处理,所以我们可以在第7节看到设置了回
调函数为beforeSleep,但是这个beforeSleep到底有什么作用呢?我们在第9节再详细讲述。对于aeMain(ae.c:375)就是 整个程序的主要循环。

void aeMain(aeEventLoop *eventLoop) {

eventLoop->stop = 0;

while (!eventLoop->stop) {

if (eventLoop->beforesleep != NULL)

eventLoop->beforesleep(eventLoop);

aeProcessEvents(eventLoop, AE_ALL_EVENTS);

}

}

9、在beforeSleep内部一共有三部分,第一部分对vm进行处理(即第一个if块),这里我们略过;第二部分是释放客户端的阻塞操作,在 redis里有两个命令BLPOP和BRPOP需要使用这些操作(弹出列表头或者尾,实现方式见t_list.c:862行的 blockingPopGenericCommand函数),当指定的key不存在或者列表为空的情况下,那么客户端会一直阻塞,直到列表有数据时,服务
端就会去执行lpop或者rpop并返回给客户端,那么什么时候需要用到BLPOP和BRPOP呢?大家平时肯定用redis做过队列,最常见的处理方式
就是使用llen去判断队列有没有数据,如果有数据就去取N条,然后处理,如果没有就sleep(3),然后继续循环,其实这里就可以使用BLPOP或者 BRPOP来轻松实现,而且可以减少请求,具体怎么实现留给大家思考;第三部分就是flushAppendOnlyFile(aof.c:60),这个函
数主要目的是将aofbuf的数据写到文件,那aofbuf是什么呢?他是AOF的一个缓冲区,所以客户端的命令都会在处理完后把这些命令追加到这个缓冲 区中,然后待一轮数据处理完之后统一写到文件(所以aof也是不能100%保证数据不丢失的,因为如果当redis正在处理这些命令的情况下服务就挂掉,
那么这部分的数据是没有保存到硬盘的),大家都知道写数据到文件并不是立即写到硬盘,只是保存到一个文件缓冲区中,什么情况下会把缓冲区的数据转到硬盘 呢?只要满足如下三种条件的一种就能将数据真正存到硬盘:1、手动调用刷新缓冲区;2、缓冲区已满;3、程序正常退出。因此redis将数据写到文件缓冲
区之后会判断是否需要刷到硬盘,server.appendfsync有两种方式,第一种(APPENDFSYNC_ALWAYS):无条件刷新,即每次 写文件都会保存到硬盘,第二种(APPENDFSYNC_EVERYSEC):每隔一秒保存到硬盘。

10、接下来我们开始讲解aeProcessEvents(ae.c:275)的处理流程,首先我们来回顾一下第5节设置的定时器和监听 socket事件处理,其中socket事件处理会回调acceptTcpHandler(networking.c:410)和定时器回调函数 serverCron(redis.c:519),在aeProcessEvents的内部有两部分需要处理,第一部分是调用aeApiPoll判断 socket是否有数据可读,整个服务端的socket里面要分监听socket和客户端socket,当有客户端链接服务器时,会触发监听socket 的事件处理函数,也就是acceptTcpHandler,而acceptTcpHandler会去调用 createClient(networking.c:13)创建客户端对像,然后为这个客户端设置事件处理函数 readQueryFromClient(networking.c:827),所以当客户端有消息时就会触发客户端socket事件处理函数,处理数据部分讲在后面详细讲解,接下来的第二部分就是定时器,每次在socket部分处理完后就用调用processTimeEvents(ae.c:212)来处理定时器,那么内部实现也很简单,当设置定时器的时候就会计算好应该触发的时间,所以这里就
只需要判断当前时间是否大于或者等于应该触发的时间即可。那么这个定时器到底做了什么呢?请继续第11节。

11、我们继续跟踪源代码serverCron(redis.c:519),整个函数分为七个部分,第一部分:在服务端打印一些关于DB的信息(包 括key数量,内存使用量等);第二部分:判断DB的hash
table是否需要扩展大小tryResizeHashTables(redis.c:432);第三部分:关闭太长时间没有通信的链接closeTimedoutClients(networking.c:629);第四部分:保存rdb文件 rdbSaveBackground(rdb.c:507),当然也是在需要保存的情况才会保存,即设置save参数;第五部分:清除过期的key,当然
这里不是清除全部,他只是随机取出一些activeExpireCycle(redic.c:477);第六部分:虚拟内存交换部分,将一部分key转到 虚拟内存中,这里的key也是随机抽取的, vmSwapOneObjectBlocking(vm.c:521);第七部分:主从同
步,replicationCron(replication.c:500)。

12、在第10节中我们讲到客户端socket处理函数readQueryFromClient,这里我们一层层分析,首先是从客户端读取数据,然 后调用processInputBuffer,在内部先是判断类型,然后调用processInlineBuffer或者 processMultibulkBuffer解析参数,解析后的参数由argv存储参数,其类型是一个指向指针的指针,其中argv[0]是命令名称,
后面就是命令参数,argc存储参数数量;然后调用processCommand(redis.c:979)处理命令,在内部调用 lookupCommand(redis.c:940)获取命令对应的函数,然后调用freeMemoryIfNeeded(redis.c:1385) 判断是否需要释放一些内存,接下来就是调用call(redis.c:954)去执行命令,执行命令后会调用feedAppendOnlyFile(aof.c:137)把命令行保存到aofbuf中,然后判断是否需要同步数据到slave,如果需要则调用replicationFeedSlaves(replication.c:10),接下来就是判断是否需要将数据发送到监控端,如果需要则调用replicationFeedMonitors(replication.c:82),到这里整个服务流程就结束了。至于每条命令如何执行,大家可以去
查看以t_开头的几个文件。下面是一张整个服务的流程图。



注:redis源代码为2.2.8,希望大家看文章的时候配合源代码一起看,更容易理解

Redis通过定义一个 struct redisServer 类型的全局变量server 来保存服务器的相关信息(比如:配置信息,统计信息,服务器状态等等)。启动时通过读取配置文件里边的信息对server进行初始化(如果没有指定配置文件,将使用默认值对sever进行初始化),初始化的内容有:起监听端口,绑定有新连接时的回调函数,绑定服务器的定时函数,虚拟内存初始化,log初始化等等。

  启动

  初始化服务器配置

  先来看看redis 的main函数的入口

  Redis.c:1694

int main(int argc, char **argv) { 

    time_t start; 

    initServerConfig(); 

    if (argc == 2) { 

        if (strcmp(argv[1], "-v") == 0 || 

            strcmp(argv[1], "--version") == 0) version(); 

        if (strcmp(argv[1], "--help") == 0) usage(); 

        resetServerSaveParams(); 

        loadServerConfig(argv[1]); 

    } else if ((argc > 2)) { 

        usage(); 

    } else { 

        ... 

    } 

    if (server.daemonize) daemonize(); 

    initServer(); 

    ...
initServerConfig初始化全局变量 server 的属性为默认值。

如果命令行指定了配置文件, resetServerSaveParams重置对落地备份的配置(即重置为默认值)并读取配置文件的内容对全局变量 server 再进行初始化 ,没有在配置文件中配置的将使用默认值。

如果服务器配置成后台执行,则对服务器进行 daemonize。

initServer初始化服务器,主要是设置信号处理函数,初始化事件轮询,起监听端口,绑定有新连接时的回调函数,绑定服务器的定时函数,初始化虚拟内存和log等等。

创建服务器监听端口。
  Redis.c:923

    if (server.port != 0) { 

        server.ipfd= anetTcpServer(server.neterr,server.port,server.bindaddr); 

        if (server.ipfd == ANET_ERR) { 

            redisLog(REDIS_WARNING, "Opening port %d: %s", 

                server.port, server.neterr); 

            exit(1); 

        } 

    }
anetTcpServer创建一个socket并进行监听,然后把返回的socket fd赋值给server.ipfd。
  事件轮询结构体定义

  先看看事件轮询的结构体定义

  Ae.h:88

/* State of an event based program */ 

typedef struct aeEventLoop { 

    int maxfd; 

    long long timeEventNextId; 

    aeFileEvent events[AE_SETSIZE]; /* Registered events */ 

    aeFiredEvent fired[AE_SETSIZE]; /* Fired events */ 

    aeTimeEvent *timeEventHead; 

    int stop; 

    void *apidata; /* This is used for polling API specific data */ 

    aeBeforeSleepProc *beforesleep; 

} aeEventLoop;
maxfd是最大的文件描述符,主要用来判断是否有文件事件需要处理(ae.c:293)和当使用select 来处理网络IO时作为select的参数(ae_select.c:50)。

timeEventNextId 是下一个定时事件的ID。

events[AE_SETSIZE]用于保存通过aeCreateFileEvent函数创建的文件事件,在sendReplyToClient函数和freeClient函数中通过调用aeDeleteFileEvent函数删除已经处理完的事件。

fired[AE_SETSIZE]用于保存已经触发的文件事件,在对应的网络I/O函数中进行赋值(epoll,select,kqueue),不会对fired进行删除操作,只会一直覆盖原来的值。然后在aeProcessEvents函数中对已经触发的事件进行处理。

timeEventHead 是定时事件链表的头,定时事件的存储用链表实现。

Stop 用于停止事件轮询处理。

apidata 用于保存轮询api需要的数据,即aeApiState结构体,对于epoll来说,aeApiState结构体的定义如下:

typedef struct aeApiState { 

    int epfd; 

    struct epoll_event events[AE_SETSIZE]; 

} aeApiState;
beforesleep 是每次进入处理事件时执行的函数。
  创建事件轮询

  Redis.c:920

  server.el = aeCreateEventLoop(); 

Ae.c:55 

aeEventLoop *aeCreateEventLoop(void) { 

    aeEventLoop *eventLoop; 

    int i; 

    eventLoop = zmalloc(sizeof(*eventLoop)); 

    if (!eventLoop) return NULL; 

    eventLoop->timeEventHead = NULL; 

    eventLoop->timeEventNextId = 0; 

    eventLoop->stop = 0; 

    eventLoop->maxfd = -1; 

    eventLoop->beforesleep = NULL; 

    if (aeApiCreate(eventLoop) == -1) { 

        zfree(eventLoop); 

        return NULL; 

    } 

/* Events with mask == AE_NONE are not set. So let's initialize 

 * the vector with it. */ 

    for (i = 0; i < AE_SETSIZE; i++) 

        eventLoop->events[i].mask = AE_NONE; 

    return eventLoop; 

}
  绑定定时函数和有新连接时的回调函数

  redis.c:973

aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); 

if (server.ipfd > 0 && 

    aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, 

acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
aeCreateTimeEvent创建定时事件并绑定回调函数serverCron,这个定时事件第一次是超过1毫秒就有权限执行,如果其他事件的处理时间比较长,可能会出现超过一定时间都没执行情况。这里的1毫秒只是超过后有可执行的权限,并不是一定会执行。第一次执行后,如果还要执行,是由定时函数的返回值确定的,在processTimeEvents(ae.c:219)中,当调用定时回调函数后,获取定时回调函数的返回值,如果返回值不等于-1,则设置定时回调函数的下一次触发时间为当前时间加上定时回调函数的返回值,即调用间隔时间。serverCron的返回值是100ms,表明从二次开始,每超过100ms就有权限执行。(定时回调函数serverCron用于更新lru时钟,更新服务器的状态,打印一些服务器信息,符合条件的情况下对hash表进行重哈希,启动后端写AOF或者检查后端写AOF或者备份是否完成,检查过期的KEY等等)

aeCreateFileEvent创建监听端口的socket fd的文件读事件(即注册网络io事件)并绑定回调函数acceptTcpHandler。
  进入事件轮询

  初始化后将进入事件轮询

  Redis.c:1733

    aeSetBeforeSleepProc(server.el,beforeSleep); 

    aeMain(server.el); 

    aeDeleteEventLoop(server.el);
设置每次进入事件处理前会执行的函数beforeSleep。

进入事件轮询aeMain。

退出事件轮询后删除事件轮询,释放事件轮询占用内存aeDeleteEventLoop(不过没在代码中发现有执行到这一步的可能,服务器接到shutdown命令时通过一些处理后直接就通过exit退出了,可能是我看错了,待验证)。
  事件轮询函数aeMain

  看看aeMain的内容

  Ae.c:382

void aeMain(aeEventLoop *eventLoop) { 

    eventLoop->stop = 0; 

    while (!eventLoop->stop) { 

        if (eventLoop->beforesleep != NULL) 

            eventLoop->beforesleep(eventLoop); 

        aeProcessEvents(eventLoop, AE_ALL_EVENTS); 

    } 

}
每次进入事件处理前,都会调用设置的beforesleep,beforeSleep函数主要是处理被阻塞的命令和根据配置写AOF。

aeProcessEvents处理定时事件和网络io事件。
  启动完毕,等待客户端请求

  到进入事件轮询函数后,redis的启动工作就做完了,接下来就是等待客户端的请求了。

  接收请求

  新连接到来时的回调函数

  在绑定定时函数和有新连接时的回调函数中说到了绑定有新连接来时的回调函数acceptTcpHandler,现在来看看这个函数的具体内容

  Networking.c:427

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { 

    int cport, cfd; 

    char cip[128]; 

    REDIS_NOTUSED(el); 

    REDIS_NOTUSED(mask); 

    REDIS_NOTUSED(privdata); 

    cfd = anetTcpAccept(server.neterr, fd, cip, &cport); 

    if (cfd == AE_ERR) { 

        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr); 

        return; 

    } 

    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); 

    acceptCommonHandler(cfd); 

}
anetTcpAccept 函数 accept新连接,返回的cfd是新连接的socket fd。

acceptCommonHandler 函数是对新建立的连接进行处理,这个函数在使用 unix socket 时也会被用到。
  接收客户端的新连接

  接下来看看anetTcpAccept函数的具体内容

Anet.c:330 

int anetTcpAccept(char *err, int s, char *ip, int *port) { 

    int fd; 

    struct sockaddr_in sa; 

    socklen_t salen = sizeof(sa); 

    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR) 

        return ANET_ERR; 

    if (ip) strcpy(ip,inet_ntoa(sa.sin_addr)); 

    if (port) *port = ntohs(sa.sin_port); 

    return fd; 

}
  再进去anetGenericAccept 看看

  Anet.c:313

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) { 

    int fd; 

    while(1) { 

        fd = accept(s,sa,len); 

        if (fd == -1) { 

            if (errno == EINTR) 

                continue; 

            else { 

                anetSetError(err, "accept: %s", strerror(errno)); 

                return ANET_ERR; 

            } 

        } 

        break; 

    } 

    return fd; 

}
anetTcpAccept 函数中调用anetGenericAccept 函数进行接收新连接,anetGenericAccept函数在 unix socket 的新连接处理中也会用到。

anetTcpAccept 函数接收新连接后,获取客户端得ip,port 并返回。
  创建redisClient进行接收处理

  anetTcpAccept 运行完后,返回新连接的socket fd, 然后返回到调用函数acceptTcpHandler中,继续执行acceptCommonHandler 函数

  Networking.c:403

static void acceptCommonHandler(int fd) { 

    redisClient *c; 

    if ((c = createClient(fd)) == NULL) { 

        redisLog(REDIS_WARNING,"Error allocating resoures for the client"); 

        close(fd); /* May be already closed, just ingore errors */ 

        return; 

    } 

    /* If maxclient directive is set and this is one client more... close the 

     * connection. Note that we create the client instead to check before 

     * for this condition, since now the socket is already set in nonblocking 

     * mode and we can send an error for free using the Kernel I/O */ 

    if (server.maxclients && listLength(server.clients) > server.maxclients) { 

        char *err = "-ERR max number of clients reached\r\n"; 

        /* That's a best effort error message, don't check write errors */ 

        if (write(c->fd,err,strlen(err)) == -1) { 

            /* Nothing to do, Just to avoid the warning... */ 

        } 

        freeClient(c); 

        return; 

    } 

    server.stat_numconnections++; 

}
创建一个 redisClient 来处理新连接,每个连接都会创建一个 redisClient 来处理。

如果配置了最大并发客户端,则对现有的连接数进行检查和处理。

最后统计连接数。
  绑定有数据可读时的回调函数

  Networking.c:15

redisClient *createClient(int fd) { 

    redisClient *c = zmalloc(sizeof(redisClient)); 

    c->bufpos = 0; 

    anetNonBlock(NULL,fd); 

    anetTcpNoDelay(NULL,fd); 

    if (aeCreateFileEvent(server.el,fd,AE_READABLE, 

        readQueryFromClient, c) == AE_ERR) 

    { 

        close(fd); 

        zfree(c); 

        return NULL; 

    } 

    selectDb(c,0); 

    c->fd = fd; 

    c->querybuf = sdsempty(); 

c->reqtype = 0; 

... 

}
创建新连接的socket fd对应的文件读事件,绑定回调函数readQueryFromClient。

如果创建成功,则对 redisClient 进行一系列的初始化,因为 redisClient 是通用的,即不管是什么命令的请求,都是通过创建一个 redisClient 来处理的,所以会有比较多的字段需要初始化。
  createClient 函数执行完后返回到调用处acceptCommonHandler函数,然后从acceptCommonHandler函数再返回到acceptTcpHandler函数。

  接收请求完毕,准备接收客户端得数据

  到此为止,新连接到来时的回调函数acceptTcpHandler执行完毕,在这个回调函数中创建了一个redisClient来处理这个客户端接下来的请求,并绑定了接收的新连接的读文件事件。当有数据可读时,网络i/o轮询(比如epoll)会有事件触发,此时绑定的回调函数readQueryFromClient将会调用来处理客户端发送过来的数据。

  读取客户端请求的数据

  在绑定有数据可读时的回调函数中的createClient函数中绑定了一个有数据可读时的回调函数readQueryFromClient函数,现在看看这个函数的具体内容

  Networking.c:874

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { 

    redisClient *c = (redisClient*) privdata; 

    char buf[REDIS_IOBUF_LEN]; 

    int nread; 

    REDIS_NOTUSED(el); 

    REDIS_NOTUSED(mask); 

    server.current_client = c; 

    nread = read(fd, buf, REDIS_IOBUF_LEN); 

    if (nread == -1) { 

        if (errno == EAGAIN) { 

            nread = 0; 

        } else { 

            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno)); 

            freeClient(c); 

            return; 

        } 

    } else if (nread == 0) { 

        redisLog(REDIS_VERBOSE, "Client closed connection"); 

        freeClient(c); 

        return; 

    } 

    if (nread) { 

        c->querybuf = sdscatlen(c->querybuf,buf,nread); 

        c->lastinteraction = time(NULL); 

    } else { 

        server.current_client = NULL; 

        return; 

    } 

    if (sdslen(c->querybuf) > server.client_max_querybuf_len) { 

        sds ci = getClientInfoString(c), bytes = sdsempty(); 

        bytes = sdscatrepr(bytes,c->querybuf,64); 

        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); 

        sdsfree(ci); 

        sdsfree(bytes); 

        freeClient(c); 

        return; 

    } 

    processInputBuffer(c); 

    server.current_client = NULL; 

}
调用系统函数read来读取客户端传送过来的数据,调用read后对读取过程中被系统中断的情况(nread == -1 && errno == EAGAIN),客户端关闭的情况(nread == 0)进行了判断处理。

如果读取的数据超过限制(1GB)则报错。

读取完后进入processInputBuffer进行协议解析。
  请求协议

  从readQueryFromClient函数读取客户端传过来的数据,进入processInputBuffer函数进行协议解析,可以把processInputBuffer函数看作是输入数据的协议解析器

  Networking.c:835

void processInputBuffer(redisClient *c) { 

    /* Keep processing while there is something in the input buffer */ 

    while(sdslen(c->querybuf)) { 

        /* Immediately abort if the client is in the middle of something. */ 

        if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; 

        /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is 

         * written to the client. Make sure to not let the reply grow after 

         * this flag has been set (i.e. don't process more commands). */ 

        if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; 

        /* Determine request type when unknown. */ 

        if (!c->reqtype) { 

            if (c->querybuf[0] == '*') { 

                c->reqtype = REDIS_REQ_MULTIBULK; 

            } else { 

                c->reqtype = REDIS_REQ_INLINE; 

            } 

        } 

        if (c->reqtype == REDIS_REQ_INLINE) { 

            if (processInlineBuffer(c) != REDIS_OK) break; 

        } else if (c->reqtype == REDIS_REQ_MULTIBULK) { 

            if (processMultibulkBuffer(c) != REDIS_OK) break; 

        } else { 

            redisPanic("Unknown request type"); 

        } 

        /* Multibulk processing could see a <= 0 length. */ 

        if (c->argc == 0) { 

            resetClient(c); 

        } else { 

            /* Only reset the client when the command was executed. */ 

            if (processCommand(c) == REDIS_OK) 

                resetClient(c); 

        } 

    } 

}
Redis支持两种协议,一种是inline,一种是multibulk。inline协议是老协议,现在一般只在命令行下的redis客户端使用,其他情况一般是使用multibulk协议。

如果客户端传送的数据的第一个字符时‘*’,那么传送数据将被当做multibulk协议处理,否则将被当做inline协议处理。Inline协议的具体解析函数是processInlineBuffer,multibulk协议的具体解析函数是processMultibulkBuffer。

当协议解析完毕,即客户端传送的数据已经解析出命令字段和参数字段,接下来进行命令处理,命令处理函数是processCommand。
  Inline请求协议

  Networking.c:679

int processInlineBuffer(redisClient *c) { 

    ... 

}
根据空格分割客户端传送过来的数据,把传送过来的命令和参数保存在argv数组中,把参数个数保存在argc中,argc的值包括了命令参数本身。即set key value命令,argc的值为3。详细解析见协议详解
  Multibulk请求协议

  Multibulk协议比inline协议复杂,它是二进制安全的,即传送数据可以包含不安全字符。Inline协议不是二进制安全的,比如,如果set key value命令中的key或value包含空白字符,那么inline协议解析时将会失败,因为解析出来的参数个数与命令需要的的参数个数会不一致。

  协议格式

*<number of arguments> CR LF 

$<number of bytes of argument 1> CR LF 

<argument data> CR LF 

... 

$<number of bytes of argument N> CR LF 

<argument data> CR LF
  协议举例

*3 

$3 

SET 

$5 

mykey 

$7 

myvalue
  具体解析代码位于

  Networking.c:731

int processMultibulkBuffer(redisClient *c) { 

... 

}
  详细解析见协议详解

  处理命令

  当协议解析完毕,则表示客户端的命令输入已经全部读取并已经解析成功,接下来就是执行客户端命令前的准备和执行客户端传送过来的命令

  Redis.c:1062

/* If this function gets called we already read a whole 

 * command, argments are in the client argv/argc fields. 

 * processCommand() execute the command or prepare the 

 * server for a bulk read from the client. 

 * 

 * If 1 is returned the client is still alive and valid and 

 * and other operations can be performed by the caller. Otherwise 

 * if 0 is returned the client was destroied (i.e. after QUIT). */ 

int processCommand(redisClient *c) { 

... 

 /* Now lookup the command and check ASAP about trivial error conditions 

  * such as wrong arity, bad command name and so forth. */ 

c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); 

... 

call(c); 

... 

}
lookupCommand先根据客户端传送过来的数据查找该命令并找到命令的对应处理函数。

Call函数调用该命令函数来处理命令,命令与对应处理函数的绑定位于。
  Redi.c:72

struct redisCommand *commandTable; 

struct redisCommand readonlyCommandTable[] = { 

{"get",getCommand,2,0,NULL,1,1,1}, 

... 

}
  回复请求

  回复请求位于对应的命令中,以get命令为例

  T_string.c:67

void getCommand(redisClient *c) { 

    getGenericCommand(c); 

}
  T_string.c:52

int getGenericCommand(redisClient *c) { 

    robj *o; 

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL) 

        return REDIS_OK; 

    if (o->type != REDIS_STRING) { 

        addReply(c,shared.wrongtypeerr); 

        return REDIS_ERR; 

    } else { 

        addReplyBulk(c,o); 

        return REDIS_OK; 

    } 

}
getGenericCommand在getset 命令中也会用到。

lookupKeyReadOrReply是以读数据为目的查询key函数,并且如果该key不存在,则在该函数中做不存在的回包处理。

如果该key存在,则返回该key对应的数据,addReply函数以及以addReply函数开头的都是回包函数。
  绑定写数据的回调函数

  接下来看看addReply函数里的内容

  Networking.c:190

void addReply(redisClient *c, robj *obj) { 

    if (_installWriteEvent(c) != REDIS_OK) return; 

    ... 

}
  Networking.c:64

int _installWriteEvent(redisClient *c) { 

    if (c->fd <= 0) return REDIS_ERR; 

    if (c->bufpos == 0 && listLength(c->reply) == 0 && 

        (c->replstate == REDIS_REPL_NONE || 

         c->replstate == REDIS_REPL_ONLINE) && 

        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, 

        sendReplyToClient, c) == AE_ERR) return REDIS_ERR; 

    return REDIS_OK; 

}
addReply函数一进来就先调用绑定写数据的回调函数installWriteEvent。

installWriteEvent函数中创建了一个文件写事件和绑定写事件的回调函数为sendReplyToClient。
  准备写的数据内容

    addReply函数一进来后就绑定写数据的回调函数,接下来就是准备写的数据内容

  Networking.c:190

void addReply(redisClient *c, robj *obj) { 

    if (_installWriteEvent(c) != REDIS_OK) return; 

    redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY); 

    /* This is an important place where we can avoid copy-on-write 

     * when there is a saving child running, avoiding touching the 

     * refcount field of the object if it's not needed. 

     * 

     * If the encoding is RAW and there is room in the static buffer 

     * we'll be able to send the object to the client without 

     * messing with its page. */ 

    if (obj->encoding == REDIS_ENCODING_RAW) { 

        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) 

            _addReplyObjectToList(c,obj); 

    } else { 

        /* FIXME: convert the long into string and use _addReplyToBuffer() 

         * instead of calling getDecodedObject. As this place in the 

         * code is too performance critical. */ 

        obj = getDecodedObject(obj); 

        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) 

            _addReplyObjectToList(c,obj); 

        decrRefCount(obj); 

    } 

}
先尝试把要返回的内容添加到发送数据缓冲区中(redisClient->buf),如果该缓冲区的大小已经放不下这次想放进去的数据,或者已经有数据在排队(redisClient->reply 链表不为空),则把数据添加到发送链表的尾部。
  给客户端答复数据

  在绑定写数据的回调函数中看到绑定了回调函数sendReplyToClient,现在来看看这个函数的主要内容

  Networking.c:566

void sendReplyToClient(aeEventLoop *el, int fd, ...) { 

    ... 

while(c->bufpos > 0 || listLength(c->reply)) { 

    ... 

    if(c->bufpos > 0){ 

        ... 

            nwritten=write(fd,...,c->bufpos-c->sentlen); 

            ... 

        } else { 

            o = listNodeValue(listFirst(c->reply)); 

            ... 

            nwritten=write(fd,...,objlen-c->sentlen); 

            ... 

        } 

    } 

}
通过调用系统函数write给客户端发送数据,如果缓冲区有数据就把缓冲区的数据发送给客户端,缓冲区的数据发送完了,如果有排队数据,则继续发送。
  退出

  Redis 服务器的退出是通过shutdown命令来退出的,退出前会做一系列的清理工作

  Db.c:347

void shutdownCommand(redisClient *c) { 

    if (prepareForShutdown() == REDIS_OK) 

        exit(0); 

    addReplyError(c,"Errors trying to SHUTDOWN. Check logs."); 

}
  总结

  框架从启动,接收请求,读取客户端数据,请求协议解析,处理命令,回复请求,退出对redis运行的整个流程做了一个梳理。对整个redis的运作和框架有了一个初步的了解。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: