您的位置:首页 > 理论基础 > 计算机网络

redis原理-网络框架

2016-02-17 09:31 441 查看
redis原理-网络框架
一、  流程

redis自己封装了一套网络框架AE框架,根据不同的系统封装了不同的实现,在linux下使用的是epoll事件模型。

在头文件config.h定义了根据当前系统环境使用的模型宏:

/* Test for polling API */

#ifdef __linux__    //一般linux系统使用epoll

#define HAVE_EPOLL 1

#endif

 

#if (defined(__APPLE__)&& defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) ||defined(__OpenBSD__) || defined (__NetBSD__) //在bsd 或者mac系统上用kqueue

#define HAVE_KQUEUE 1

#endif

 

#ifdef __sun

#include<sys/feature_tests.h>

#ifdef _DTRACE_VERSION

#define HAVE_EVPORT 1   //在sun系统使用evport

#endif

#endif

 

在ae.c(网络框架实现文件)里面最开始:

根据不同的环境保护不同的c文件实现跨平台,select一般是在windows场景使用的较多(linux下也有,但是不如epoll效率高)

/* Include the bestmultiplexing layer supported by this system.

 * The following should be ordered byperformances, descending. */

#ifdef HAVE_EVPORT

#include"ae_evport.c"

#else

    #ifdef HAVE_EPOLL

    #include "ae_epoll.c"

    #else

        #ifdef HAVE_KQUEUE

        #include "ae_kqueue.c"

        #else

        #include "ae_select.c"

        #endif

    #endif

#endif

 

本分析是基于常规linux下环境,所以只分析基于epoll的框架。

 

在redis.h声明的redisServer定义了成员:el

struct redisServer {

aeEventLoop *el; //保存了网络事件循环

 

在 redis.c/initServer()/调用:

server.el =aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);

 

//server.maxclients =REDIS_MAX_CLIENTS 

//#define REDIS_MAX_CLIENTS10000

//文件描述符(事件)大小是 10000+32+96=10128

/* When configuring the Rediseventloop, we setup it so that the total number

 * of file descriptors we can handle areserver.maxclients + RESERVED_FDS + FDSET_INCR

 * that is our safety margin. */

#defineREDIS_EVENTLOOP_FDSET_INCR (REDIS_MIN_RESERVED_FDS+96)

#defineREDIS_MIN_RESERVED_FDS 32//保留的是32个

 

//文件读写事件(这里说的文件读写其实是说的网络接收和发送,在linux下所有的io设备都称为文件,那么网络也被抽象成一个io文件)

 

ae.h/

 

/* File event structure */

typedef struct aeFileEvent {

    int mask; /* one of AE_(READABLE|WRITABLE)*/  //标志位。标示是可读还是可写

    aeFileProc *rfileProc;  //写文件回调函数

    aeFileProc *wfileProc;  //读文件回调函数

    void *clientData;       //客户端数据

} aeFileEvent;

 

//定时器

/* Time event structure */

typedef struct aeTimeEvent {

    long long id; /* time event identifier. *///定时器标志

    long when_sec; /* seconds */    

    long when_ms; /* milliseconds */ //毫秒

    aeTimeProc *timeProc;                //定时器回调

    aeEventFinalizerProc *finalizerProc;

    void *clientData;

    struct aeTimeEvent *next;

} aeTimeEvent;

 

/* A fired event */             //待处理的事件

typedef struct aeFiredEvent {

    int fd;       //事件句柄

    int mask;     //事件可以进行的操作

} aeFiredEvent;

 

/* State of an event basedprogram */

typedef struct aeEventLoop {

    int maxfd;  /* highest file descriptor currently registered */ //当前注册的最高文件描述符

    int setsize; /* max number of filedescriptors tracked */ 目前追踪的最大文件描述符个数

    long long timeEventNextId;

    time_t lastTime;     /* Used to detect system clock skew */

    aeFileEvent *events; /* Registered events*/ //已经注册的事件

    aeFiredEvent *fired; /* Fired events */ //待处理的事件

    aeTimeEvent *timeEventHead; //时间事件头指针

    int stop;

    void *apidata; /* This is used for pollingAPI specific data */ //epoll私有数据

    aeBeforeSleepProc *beforesleep;

} aeEventLoop;

 

 

ae.c/

aeEventLoop*aeCreateEventLoop(int setsize) {

    aeEventLoop *eventLoop;

    int i;

 

    if ((eventLoop =zmalloc(sizeof(*eventLoop))) == NULL) goto err;

    eventLoop->events =zmalloc(sizeof(aeFileEvent)*setsize); //setsize = 10128

    eventLoop->fired =zmalloc(sizeof(aeFiredEvent)*setsize);

    if (eventLoop->events == NULL || eventLoop->fired== NULL) goto err;

    eventLoop->setsize = setsize;

    eventLoop->lastTime = time(NULL);

    eventLoop->timeEventHead = NULL;

    eventLoop->timeEventNextId = 0;

    eventLoop->stop = 0;

    eventLoop->maxfd = -1; //默认-1

    eventLoop->beforesleep = NULL;

    if (aeApiCreate(eventLoop) == -1) goto err;

    /* Events with mask == AE_NONE are not set.So let's initialize the

     * vector with it. */

    for (i = 0; i < setsize; i++)

        eventLoop->events[i].mask = AE_NONE;//所有事件的mask默认都是事件为空

    return eventLoop;

 

err:

    if (eventLoop) {

        zfree(eventLoop->events);

        zfree(eventLoop->fired);

        zfree(eventLoop);

    }  

    return NULL;

}

 

//创建epoll

static intaeApiCreate(aeEventLoop *eventLoop) {

    aeApiState *state =zmalloc(sizeof(aeApiState));

    if (!state) return -1;

    state->events = zmalloc(sizeof(structepoll_event)*eventLoop->setsize);//创建这么多个事件

    if (!state->events) {

        zfree(state);

        return -1;

}

//默认创建的是1024个事件

    state->epfd = epoll_create(1024); /*1024 is just a hint for the kernel */

    if (state->epfd == -1) {

        zfree(state->events);

        zfree(state);

        return -1;

    }

    eventLoop->apidata = state; //绑定到eventloop

    return 0;

}

 

typedef struct aeApiState {

    int epfd;

    struct epoll_event *events; //epoll事件

} aeApiState;

 

 

//redis.c/initServer()

//server.ipfd_count = 0;

//intipfd[REDIS_BINDADDR_MAX]; /* TCP socket file descriptors */

//#define REDIS_BINDADDR_MAX 16
/* Open the TCP listeningsocket for the user commands. */

if (server.port != 0&&

listenToPort(server.port,server.ipfd,&server.ipfd_count)== REDIS_ERR)

exit(1);

 

//监听端口

int listenToPort(int port,int *fds, int *count) {

    int j;

 
//server.bindaddr_count 初始化是0的
//char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we shouldbind to */
//#define REDIS_BINDADDR_MAX 16
/* Force binding of 0.0.0.0 if no bind address is specified,always
* entering the loop if j == 0. */
    if(server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j <server.bindaddr_count || j == 0; j++) {
        if(server.bindaddr[j] == NULL) {
            /* Bind * forboth IPv6 and IPv4, we enter here only if
             *server.bindaddr_count == 0. */
              //先绑定ipv6的地址
            fds[*count] =anetTcp6Server(server.neterr,port,NULL,
               server.tcp_backlog);
            if(fds[*count] != ANET_ERR) {
             
              //设置socket为非阻塞模式
               anetNonBlock(NULL,fds[*count]);
               (*count)++;
            }
      
           //绑定ipv4
            fds[*count] =anetTcpServer(server.neterr,port,NULL,
               server.tcp_backlog);
            if(fds[*count] != ANET_ERR) {
               anetNonBlock(NULL,fds[*count]);
               (*count)++;
            }
            /* Exit theloop if we were able to bind * on IPv4 or IPv6,
             * otherwisefds[*count] will be ANET_ERR and we'll print an
             * error andreturn to the caller with an error. */
            if (*count)break;
        } else if(strchr(server.bindaddr[j],':')) {
            /* Bind IPv6address. */
            fds[*count] =anetTcp6Server(server.neterr,port,server.bindaddr[j],
               server.tcp_backlog);
        } else {
            /* Bind IPv4address. */
            fds[*count] =anetTcpServer(server.neterr,port,server.bindaddr[j],
               server.tcp_backlog);
        }
        if (fds[*count] ==ANET_ERR) {
           redisLog(REDIS_WARNING,
               "Creating Server TCP listening socket %s:%d: %s",
               server.bindaddr[j] ? server.bindaddr[j] : "*",
                port,server.neterr);
            returnREDIS_ERR;
        }
       anetNonBlock(NULL,fds[*count]);
        (*count)++;
    }
    return REDIS_OK;
}
//绑定ipv4
int anetTcpServer(char *err, int port, char *bindaddr, intbacklog)
{
    return_anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}
//绑定ipv6
int anetTcp6Server(char *err, int port, char *bindaddr, intbacklog)
{
    return_anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
}
//绑定ipv4
static int _anetTcpServer(char *err, int port, char *bindaddr,int af, int backlog)
{
    int s, rv;
    char _port[6];  /* strlen("65535") */ //声明端口字符串
    struct addrinfo hints,*servinfo, *p;
 
   snprintf(_port,6,"%d",port);
   memset(&hints,0,sizeof(hints));
    hints.ai_family = af;
    hints.ai_socktype =SOCK_STREAM;
    hints.ai_flags =AI_PASSIVE;    /* No effect if bindaddr!= NULL */
   
    //获取地址列表
    if ((rv =getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err,"%s", gai_strerror(rv));
        return ANET_ERR;
    }
for (p = servinfo; p != NULL; p = p->ai_next) {
       //创建socket
        if ((s =socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
            continue;
 
        if (af == AF_INET6&& anetV6Only(err,s) == ANET_ERR) goto error;
       //重用地址
        if(anetSetReuseAddr(err,s) == ANET_ERR) goto error;
       //绑定和监听
        if(anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) gotoerror;
        goto end;
    }
    if (p == NULL) {
        anetSetError(err,"unable to bind socket");
        goto error;
    }
 
error:
    s = ANET_ERR;
end:
   freeaddrinfo(servinfo);
    return s;
}  
//设置地址可重用
static int anetSetReuseAddr(char *err, int fd) {
    int yes = 1;
    /* Make sureconnection-intensive things like the redis benckmark
     * will be able toclose/open sockets a zillion of times */
    if (setsockopt(fd,SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) {
        anetSetError(err,"setsockopt SO_REUSEADDR: %s", strerror(errno));
        return ANET_ERR;
    }
    return ANET_OK;
}
 
//设置监听和绑定
static int anetListen(char *err, int s, struct sockaddr *sa,socklen_t len, int backlog) {
  //绑定socket
  if (bind(s,sa,len) ==-1) {
        anetSetError(err,"bind: %s", strerror(errno));
        close(s);
        return ANET_ERR;
}
//监听socket
    if (listen(s, backlog)== -1) {
        anetSetError(err,"listen: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return ANET_OK;
}
//设置socket为非阻塞模式,这里服务器默认设置的是
int anetNonBlock(char *err, int fd)
{
    int flags;
 
    /* Set the socketnon-blocking.
     * Note that fcntl(2)for F_GETFL and F_SETFL can't be
     * interrupted by asignal. */
    if ((flags = fcntl(fd,F_GETFL)) == -1) {
        anetSetError(err,"fcntl(F_GETFL): %s", strerror(errno));
        return ANET_ERR;
    }
    if (fcntl(fd, F_SETFL,flags | O_NONBLOCK) == -1) {
        anetSetError(err,"fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
        return ANET_ERR;
    }
    return ANET_OK;
}
 
 
//redis.c/initServer()
    /* Create theserverCron() time event, that's our main way to process
     * backgroundoperations. */
    //创建一个一个定时器用于主循环后台操作
   if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
       redisPanic("Can't create the serverCron time event.");
        exit(1);
    }   
 
    /* Create an eventhandler for accepting new connections in TCP and Unix
     * domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
   //创建socket事件处理器,这里关心的事件 是 可读事件,也就是接收数据
        if(aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
           acceptTcpHandler,NULL) == AE_ERR)
            {
               redisPanic(
                   "Unrecoverable error creating server.ipfd file event.");
            }   
}   
 
    if (server.sofd > 0&& aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
       acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable errorcreating server.sofd file event.");
 
 
//ae.c
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long longmilliseconds,
        aeTimeProc *proc,void *clientData,
        aeEventFinalizerProc *finalizerProc)
{  
    //时间事件id
    long long id =eventLoop->timeEventNextId++;
    aeTimeEvent *te;
   
    te =zmalloc(sizeof(*te));
    if (te == NULL) returnAE_ERR;
           
te->id = id;   
//获取当前时间,并且在当前时间增加1毫秒保存在时间事件里面
   aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc =proc;//绑定回调
    te->finalizerProc =finalizerProc;
    te->clientData =clientData;
   
    //每次时间事件都插入到链表的最前面
    te->next =eventLoop->timeEventHead;
    eventLoop->timeEventHead= te;
       
    return id;
}
 
//在当前时间增加milliseconds
static void aeAddMillisecondsToNow(long long milliseconds, long*sec, long *ms) {
    long cur_sec, cur_ms,when_sec, when_ms;
   aeGetTime(&cur_sec, &cur_ms);
    when_sec = cur_sec +milliseconds/1000;
    when_ms = cur_ms + milliseconds%1000;
   
    if (when_ms >=1000) {
        when_sec ++;
        when_ms -= 1000;
    }
    *sec = when_sec;
    *ms = when_ms;
}
//获取时间
static void aeGetTime(long *seconds, long *milliseconds)
{  
    struct timeval tv;
    gettimeofday(&tv,NULL);
    *seconds = tv.tv_sec;
    *milliseconds =tv.tv_usec/1000;
}
 
//创建文件事件
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc,void *clientData)
{
    //如果超过预分配的事件大小咱返回错误
    if (fd >=eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
}  
//取出文件事件
    aeFileEvent *fe =&eventLoop->events[fd];
    //在epoll中添加这个事件
    if(aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
 
fe->mask |= mask;
//如果标志设定了可读可写,咱们帮的读和写
    if (mask &AE_READABLE) fe->rfileProc = proc;
    if (mask &AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
//设置最大fd
    if (fd >eventLoop->maxfd)
       eventLoop->maxfd = fd;
    return AE_OK;
}
//添加事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, intmask) {
    //获取到私有数据
    aeApiState *state =eventLoop->apidata;
struct epoll_event ee;
//如果当前fd中已经有操作了,咱们就进行修改操作,反之进行添加操作
    /* If the fd wasalready monitored for some event, we need a MOD
     * operation.Otherwise we need an ADD operation. */
   
int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD: EPOLL_CTL_MOD;
 
ee.events = 0;
//合并旧的
mask |= eventLoop->events[fd].mask; /* Merge old events */
//可读 咱们绑定可读,绑定可写
    if (mask &AE_READABLE) ee.events |= EPOLLIN;
    if (mask &AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
//绑定fd
ee.data.fd = fd;
//这个是我摘自epoll_ctl帮助文档中的,这里ae框架默认使用的是水平触发模式
//Sets the Edge Triggered behavior for the associated filedescriptor.  The default //behavior forepoll is Level Triggered
    if(epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}
 
 
//networking.c/ acceptTcpHandler()
//连接监听器
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata,int mask) {
    //每次最多接收#define MAX_ACCEPTS_PER_CALL 1000
    int cport, cfd, max =MAX_ACCEPTS_PER_CALL;
    charcip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
   REDIS_NOTUSED(privdata);
 
while(max--) {
   //开始接收
        cfd =anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd ==ANET_ERR) {
            if (errno !=EWOULDBLOCK)
               redisLog(REDIS_WARNING,
                   "Accepting client connection: %s", server.neterr);
            return;
        }   
       redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
    //接收公共处理器
       acceptCommonHandler(cfd,0);
    }   
}
 
//anet.c
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int*port) {
    int fd;
    structsockaddr_storage sa;
socklen_t salen = sizeof(sa);
 
//接收连接
    if ((fd =anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
        return ANET_ERR;
       
//如果是ipv4
    if (sa.ss_family ==AF_INET) {
        struct sockaddr_in*s = (struct sockaddr_in *)&sa;
        if (ip)inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
        if (port) *port =ntohs(s->sin_port);
    } else {
        struct sockaddr_in6*s = (struct sockaddr_in6 *)&sa;
        if (ip)inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
        if (port) *port =ntohs(s->sin6_port);
    }  
    return fd;
}  
 
//接收
static int anetGenericAccept(char *err, int s, struct sockaddr*sa, socklen_t *len) {
    //socket文件描述符
    int fd;
    while(1) {
        fd =accept(s,sa,len);
        if (fd == -1) {
            if (errno ==EINTR)
                continue;
            else {
               anetSetError(err, "accept: %s", strerror(errno));
                returnANET_ERR;
            }
        }
        break;
    }
    return fd;
}  
 
//公共处理
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
//创建新的客户端
    if ((c =createClient(fd)) == NULL) {
       redisLog(REDIS_WARNING,
            "Errorregistering fd event for the new client: %s (fd=%d)",
           strerror(errno),fd);
        close(fd); /* Maybe already closed, just ignore errors */
        return;
    }
    /* If maxclientdirective is set and this is one client more... close the
     * connection. Notethat we create the client instead to check before
     * for this condition,since now the socket is already set in non-blocking
     * mode and we cansend an error for free using the Kernel I/O */
    //如果超过了最大限额 #define REDIS_MAX_CLIENTS 10000
    if(listLength(server.clients) > server.maxclients) {
        char *err ="-ERR max number of clients reached\r\n";
 
        /* That's a besteffort error message, don't check write errors */
        if(write(c->fd,err,strlen(err)) == -1) {
            /* Nothing todo, Just to avoid the warning... */
        }
       server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
   server.stat_numconnections++;
    c->flags |= flags;
}
 
//创建客户端
redisClient *createClient(int fd) {
    redisClient *c =zmalloc(sizeof(redisClient));
 
    /* passing -1 as fd itis possible to create a non connected client.
     * This is usefulsince all the Redis commands needs to be executed
     * in the context of aclient. When commands are executed in other
     * contexts (forinstance a Lua script) we need a non connected client. */
   
if (fd != -1) {
       //设置非阻塞
       anetNonBlock(NULL,fd);
       anetEnableTcpNoDelay(NULL,fd);
        if(server.tcpkeepalive)
           anetKeepAlive(NULL,fd,server.tcpkeepalive);
       //创建数据读取事件,数据接收回调为:readQueryFromClient,需要看数据解析流程可以参照redis原理-2、对象以及命令解析与执行
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
           readQueryFromClient, c) == AE_ERR)
        {   
            close(fd);
            zfree(c);
            return NULL;
        }   
    }   
   
    //设置数据库为0号数据库
    selectDb(c,0);
    c->id =server.next_client_id++;
    c->fd = fd;//套接字
    c->name = NULL;
    c->bufpos = 0;
    c->querybuf =sdsempty();//缓冲区
    c->querybuf_peak =0;
    c->reqtype = 0;
    c->argc = 0;
    c->argv = NULL;//命令缓冲区
    c->cmd =c->lastcmd = NULL;
    c->multibulklen =0;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->ctime =c->lastinteraction = server.unixtime;
    c->authenticated =0;
    c->replstate =REDIS_REPL_NONE;
   c->repl_put_online_on_ack = 0;
    c->reploff = 0;
    c->repl_ack_off =0;
    c->repl_ack_time =0;
   c->slave_listening_port = 0;
    c->slave_capa =SLAVE_CAPA_NONE;
    c->reply =listCreate();
    c->reply_bytes = 0;
   c->obuf_soft_limit_reached_time = 0;
   listSetFreeMethod(c->reply,decrRefCountVoid);
   listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = REDIS_BLOCKED_NONE;
    c->bpop.timeout =0;
    c->bpop.keys =dictCreate(&setDictType,NULL);
    c->bpop.target =NULL;
    c->bpop.numreplicas= 0;
    c->bpop.reploffset= 0;
    c->woff = 0;
    c->watched_keys =listCreate();
    c->pubsub_channels= dictCreate(&setDictType,NULL);
    c->pubsub_patterns= listCreate();
    c->peerid = NULL;
   listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
   listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    if (fd != -1)listAddNodeTail(server.clients,c);
   initClientMultiState(c);
    return c;
}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: