Redis源码整体运行流程详解
2014-11-09 22:24
405 查看
[-]
Redis Server端处理Client请求的流程图
main函数
initServer函数
acceptTcpHandler函数
createClient函数
readQueryFromClient函数
processCommand与call函数
sendReplyToClient函数
小结
本文所引用的源码全部来自Redis2.8.2版本。
Redis源码整体运行流程的相关文件是:redis.h, redis.c, networking.c, ae.h, ae.c。
转载请注明,本文出自:/article/7708318.html
1、创建事件循环
[cpp] view
plaincopyprint?
<span style="font-family:Courier New;font-size:14px;">server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);</span>
2、创建TCP与UDP Server,启动服务器,完成bind与listen
[cpp] view
plaincopyprint?
<span style="font-family:Courier New;font-size:14px;">/* Open the TCP listening socket for the user commands. */
//server.ipfd是个int数组,启动服务器,完成bind,listen
if (listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm);
if (server.sofd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
}</span>
Redis2.8.2 TCP同时支持IPv4与IPv6,同时与之前版本的Redis不同,此版本支持多个TCP服务器,listenToPort函数主要还是调用anetTcpServer函数,完成socket()-->bind()-->listen(),下面详细查看下TCPServer的创建,UDP直接忽略吧,我也不知道UDP具体用在哪。
[cpp] view
plaincopyprint?
<span style="font-family:Courier New;">static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) {
//绑定bind
if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}
/* Use a backlog of 512 entries. We pass 511 to the listen() call because
* the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
* which will thus give us a backlog of 512 entries */
//监听
if (listen(s, 511) == -1) {
anetSetError(err, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}
static int _anetTcpServer(char *err, int port, char *bindaddr, int af)
{
int s, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
snprintf(_port,6,"%d",port);
memset(&hints,0,sizeof(hints));
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
//套接字地址用于监听绑定
hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */
//可以加上hints.ai_protocol = IPPROTO_TCP;
/**getaddrinfo(const char *hostname, const char *servicename,
const struct addrinfo *hint,struct addrinfo **res);
hostname:主机名
servicename: 服务名
hint: 用于过滤的模板,仅能使用ai_family, ai_flags, ai_protocol, ai_socktype,其余字段为0
res:得到所有可用的地址
*/
if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
//轮流尝试多个地址,找到一个允许连接到服务器的地址时便停止
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
//设置套接字选项setsockopt,采用地址复用
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
//bind, listen
if (anetListen(err,s,p->ai_addr,p->ai_addrlen) == ANET_ERR) goto error;
goto end;
}
if (p == NULL) {
anetSetError(err, "unable to bind socket");
goto error;
}
error:
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
}
//if server.ipfd_count = 0, bindaddr = NULL
int anetTcpServer(char *err, int port, char *bindaddr)
{
return _anetTcpServer(err, port, bindaddr, AF_INET);
}
</span>
3、将listen的端口加入到事件监听中,进行监听,由aeCreateFileEvent函数完成,其注册的listen端口可读事件处理函数为acceptTcpHandler,这样在listen端口有新连接的时候会调用acceptTcpHandler,后者在accept这个新连接,然后就可以处理后续跟这个客户端连接相关的事件了。
[cpp] view
plaincopyprint?
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
//文件事件,用于处理响应外界的操作请求,事件处理函数为acceptTcpHandler/acceptUnixHandler
//在networking.c
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
[cpp] view
plaincopyprint?
<span style="font-family:Courier New;">void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);//无意义
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
//cfd为accept函数返回的客户端文件描述符,accept使服务器完成一个客户端的链接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
//将cfd加入事件循环并设置回调函数为readQueryFromClient,并初始化redisClient
acceptCommonHandler(cfd,0);
}</span>
第一步很简单即完成accept,主要关注第二步acceptCommonHandler函数
[cpp] view
plaincopyprint?
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
if ((c = createClient(fd)) == NULL) {//创建新的客户端
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in non-blocking
* mode and we can send an error for free using the Kernel I/O */
//当前连接的客户端数目大于服务器最大运行的连接数,则拒绝连接
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
server.stat_numconnections++;
c->flags |= flags;
}
[cpp] view
plaincopyprint?
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
/**
因为 Redis 命令总在客户端的上下文中执行,
有时候为了在服务器内部执行命令,需要使用伪客户端来执行命令
在 fd == -1 时,创建的客户端为伪终端
*/
if (fd != -1) {
//下面三个都是设置socket属性
anetNonBlock(NULL,fd);//非阻塞
anetEnableTcpNoDelay(NULL,fd);//no delay
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);//keep alive
//创建一个accept fd的FileEvent事件,事件的处理函数是readQueryFromClient
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
selectDb(c,0);//默认选择第0个db, db.c
c->fd = fd;//文件描述符
c->name = NULL;
c->bufpos = 0;//将指令结果发送给客户端的字符串长度
c->querybuf = sdsempty();//请求字符串初始化
c->querybuf_peak = 0;//请求字符串顶峰时的长度值
c->reqtype = 0;//请求类型
c->argc = 0;//参数个数
c->argv = NULL;//参数内容
c->cmd = c->lastcmd = NULL;//操作指令
c->multibulklen = 0;//块个数
c->bulklen = -1;//每个块的长度
c->sentlen = 0;
c->flags = 0;//客户类型的标记,比较重要
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
c->reply = listCreate();//存放服务器应答的数据
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,decrRefCountVoid);
listSetDupMethod(c->reply,dupClientReplyValue);
c->bpop.keys = dictCreate(&setDictType,NULL);//下面三个参数在list数据阻塞操作时使用
c->bpop.timeout = 0;
c->bpop.target = NULL;
c->io_keys = listCreate();
c->watched_keys = listCreate();//事务命令CAS中使用
listSetFreeMethod(c->io_keys,decrRefCountVoid);
c->pubsub_channels = dictCreate(&setDictType,NULL);
c->pubsub_patterns = listCreate();
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
// 如果不是伪客户端,那么将客户端加入到服务器客户端列表中
if (fd != -1) listAddNodeTail(server.clients,c);//添加到server的clients链表
initClientMultiState(c);//初始化事务指令状态
return c;
}
客户端的请求指令字符串始终存放在querybuf中,在对querybuf解析后,将指令参数的个数存放在argc中,具体的指令参数存放在argv中;但是服务器应答的结果有两种存储方式:buf字符串、reply列表。
[cpp] view
plaincopyprint?
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
int nread, readlen;
size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
server.current_client = c;
readlen = REDIS_IOBUF_LEN; //1024 * 16
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= REDIS_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
//对querybuf的空间进行扩展
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//读取客户端发来的操作指令
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
//改变querybuf的实际长度和空闲长度,len += nread, free -= nread;
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
server.current_client = NULL;
return;
}
//客户端请求的字符串长度大于服务器最大的请求长度值
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = getClientInfoString(c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
//解析请求
processInputBuffer(c);
server.current_client = NULL;
}
processInputBuffer函数主要用来处理请求的解析工作,redis有两种解析方式;行指令解析与多重指令解析,行指令解析直接忽略,下面详解多重指令解析。
[cpp] view
plaincopyprint?
void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & REDIS_BLOCKED) return;
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
/* Determine request type when unknown. */
//当请求类型未知时,先确定属于哪种请求
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = REDIS_REQ_MULTIBULK;//多重指令解析
} else {
c->reqtype = REDIS_REQ_INLINE;//按行解析
}
}
if (c->reqtype == REDIS_REQ_INLINE) {
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
} else {
redisPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
//执行相应指令
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}
多重指令解析的处理函数为processMultibulkBuffer,下面先简单介绍下Redis的通讯协议:
[plain] view
plaincopyprint?
以下是这个协议的一般形式:
*< 参数数量 > CR LF
$< 参数 1 的字节数量 > CR LF
< 参数 1 的数据 > CR LF
...
$< 参数 N 的字节数量 > CR LF
< 参数 N 的数据 > CR LF
举个例子,以下是一个命令协议的打印版本:
*3
$3
SET
$3
foo
$3
bar
这个命令的实际协议值如下:
"*3\r\n$3\r\nSET\r\n$3\r\foo\r\n$3\r\bar\r\n"
[cpp] view
plaincopyprint?
/**
例:querybuf = "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
*/
int processMultibulkBuffer(redisClient *c) {
char *newline = NULL;
int pos = 0, ok;
long long ll;
if (c->multibulklen == 0) {//参数数目为0,表示这是新的请求指令
/* The client should have been reset */
redisAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string");
setProtocolError(c,0);
}
return REDIS_ERR;
}
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
return REDIS_ERR;
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
//将字符串转为long long整数,转换得到的结果存到ll中,ll就是后面参数的个数
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length");
setProtocolError(c,pos);
return REDIS_ERR;
}
pos = (newline-c->querybuf)+2;//跳过\r\n
if (ll <= 0) {//参数个数小于0,表示后面的参数数目大于等于绝对值ll
/** s = sdsnew("Hello World");
* sdsrange(s,1,-1); => "ello World"
*/
sdsrange(c->querybuf,pos,-1);//querybuf="$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
return REDIS_OK;
}
c->multibulklen = ll;//得到指令参数个数
/* Setup argv array on client structure */
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*) * c->multibulklen);//申请参数内存空间
}
redisAssertWithInfo(c,NULL,c->multibulklen > 0);
/**
开始抽取字符串
querybuf = "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
pos = 4
*/
while(c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {//参数的长度为-1,这里用来处理每个参数的字符串长度值
/**newline = "\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"*/
newline = strchr(c->querybuf+pos,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big bulk count string");
setProtocolError(c,0);
}
break;
}
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
break;
//每个字符串以$开头,后面的数字表示其长度
if (c->querybuf[pos] != '$') {
addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'",
c->querybuf[pos]);
setProtocolError(c,pos);
return REDIS_ERR;
}
//得到字符串的长度值,ll
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
if (!ok || ll < 0 || ll > 512*1024*1024) {
addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError(c,pos);
return REDIS_ERR;
}
//pos = 8
pos += newline-(c->querybuf+pos)+2;//跳过\r\n "SET\r\n$3\r\nfoo\r\n$3\r\nbar\r"
if (ll >= REDIS_MBULK_BIG_ARG) {//字符串长度超过1024*32,需要扩展
size_t qblen;
/* If we are going to read a large object from network
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data. */
/**
sdsrange(querybuf,pos,-1)是将[pos,len-1]之间的字符串使用memmove前移,
然后后面的直接截断
*/
sdsrange(c->querybuf,pos,-1);//"SET\r\n$3\r\nfoo\r\n$3\r\nbar\r"
pos = 0;
qblen = sdslen(c->querybuf);
/* Hint the sds library about the amount of bytes this string is
* going to contain. */
if (qblen < ll+2)//这里只会到最后一个字符串才可能为True,并且数据不完整,数据不完整是由于redis使用非阻塞的原因
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
}
c->bulklen = ll;
}
/* Read bulk argument */
//读取参数,没有\r\n表示数据不全,也就是说服务器接收到的数据不完整
if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
/* Not enough data (+2 == trailing \r\n) */
break;
} else {//数据完整
/* Optimization: if the buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
if (pos == 0 &&
c->bulklen >= REDIS_MBULK_BIG_ARG &&
(signed) sdslen(c->querybuf) == c->bulklen+2)
{//数据刚好完整,那么就直接使用c->querybuf,然后清空querybuf,注意这里只可能在最后一个字符串才可能出现
c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
sdsIncrLen(c->querybuf,-2); /* remove CRLF */
c->querybuf = sdsempty();
/* Assume that if we saw a fat argument we'll see another one
* likely... */
c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
pos = 0;
} else {
//抽取出具体的字符串,比如SET,建立一个stringObject
c->argv[c->argc++] =
createStringObject(c->querybuf+pos,c->bulklen);
pos += c->bulklen+2;//跳过\r\n
}
c->bulklen = -1;
c->multibulklen--;
}
}
/**
由于采用的是非阻塞读取客户端数据的方式,那么如果c->multibulklen != 0,那么就表示
数据没有接收完全,首先需要将当前的querybuf数据截断
*/
/* Trim to pos */
if (pos) sdsrange(c->querybuf,pos,-1);
/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return REDIS_OK;
/* Still not read to process the command */
return REDIS_ERR;
}
指令执行完之后,需要将得到的结果集返回给客户端,这部分是如何工作的,下面开始分析。
在networking.c中可以发现许多以addRelpy为前缀的函数名,这些函数都是用来处理各种不同类型的结果的,我们以典型的addReply函数为例,进行分析。
addReply函数
该函数第一步工作就是调用prepareClientToWrite函数为客户端创建一个写文件事件,事件的处理函数即将结果集发送给客户端的函数为sendReplyToClient.
[cpp] view
plaincopyprint?
int prepareClientToWrite(redisClient *c) {
if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
if ((c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
if (c->fd <= 0) return REDIS_ERR; /* Fake client */
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}
第二步,就是根据相应的条件,将得到的结果rboj数据存储到buf中或者reply链表中。对于存储的策略:redis优先将数据存储在固定大小的buf中,也就是redisClient结构体buf[REDIS_REPLY_CHUNK_BYTES]里,默认大小为16K。如果有数据没有发送完或c->buf空间不足,就会放到c->reply链表里面,链表每个节点都是内存buf,后来的数据放入最后面。具体的处理函数为_addReplyToBuffer和_addReplyStringToList两个函数。
[cpp] view
plaincopyprint?
void addReply(redisClient *c, robj *obj) {
if (prepareClientToWrite(c) != REDIS_OK) return;
/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
* refcount field of the object if it's not needed.
*
* If the encoding is RAW and there is room in the static buffer
* we'll be able to send the object to the client without
* messing with its page. */
if (obj->encoding == REDIS_ENCODING_RAW) {//字符串类型
//是否能将数据追加到c->buf中
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);//添加到c->reply链表中
} else if (obj->encoding == REDIS_ENCODING_INT) {//整数类型
/* Optimization: if there is room in the static buffer for 32 bytes
* (more than the max chars a 64 bit integer can take as string) we
* avoid decoding the object and go for the lower level approach. */
//追加到c->buf中
if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
char buf[32];
int len;
len = ll2string(buf,sizeof(buf),(long)obj->ptr);//整型转string
if (_addReplyToBuffer(c,buf,len) == REDIS_OK)
return;
/* else... continue with the normal code path, but should never
* happen actually since we verified there is room. */
}
obj = getDecodedObject(obj);//64位整数,先转换为字符串
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
decrRefCount(obj);
} else {
redisPanic("Wrong obj->encoding in addReply()");
}
}
[cpp] view
plaincopyprint?
/**
Server将数据发送给Client,有两种存储数据的缓冲形式,具体参见redisClient结构体
1、Response buffer
int bufpos; //回复
char buf[REDIS_REPLY_CHUNK_BYTES]; //长度为16 * 1024
2、list *reply;
unsigned long reply_bytes; Tot bytes of objects in reply list
int sentlen; 已发送的字节数
如果已经使用reply的形式或者buf已经不够存储,那么就将数据添加到list *reply中
否则将数据添加到buf中
*/
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;//计算出c->buf的剩余长度
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > 0) return REDIS_ERR;
/* Check that the buffer has enough space available for this string. */
if (len > available) return REDIS_ERR;
//回复数据追加到buf中
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return REDIS_OK;
}
/**
1、如果链表长度为0: 新建一个节点并直接将robj追加到链表的尾部
2、链表长度不为0: 首先取出链表的尾部节点
1)、尾部节点的字符串长度 + robj中ptr字符串的长度 <= REDIS_REPLY_CHUNK_BYTES:
将robj->ptr追加到尾节点的tail->ptr后面
2)、反之: 新建一个节点并直接将robj追加到链表的尾部
*/
void _addReplyObjectToList(redisClient *c, robj *o) {
robj *tail;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
//链表长度为0
if (listLength(c->reply) == 0) {
incrRefCount(o);//增加引用次数
listAddNodeTail(c->reply,o);//添加到链表末尾
c->reply_bytes += zmalloc_size_sds(o->ptr); //计算o->ptr的占用内存大小
} else {
//取出链表尾中的数据
tail = listNodeValue(listLast(c->reply));
/* Append to this object when possible. */
// 如果最后一个节点所保存的回复加上新回复内容总长度小于等于 REDIS_REPLY_CHUNK_BYTES
// 那么将新回复追加到节点回复当中。
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
{
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else {//为新回复单独创建一个节点
incrRefCount(o);
listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
}
}
// 如果突破了客户端的最大缓存限制,那么关闭客户端
asyncCloseClientOnOutputBufferLimitReached(c);
}
socket学习的不二选择。
Redis Server端处理Client请求的流程图
main函数
initServer函数
acceptTcpHandler函数
createClient函数
readQueryFromClient函数
processCommand与call函数
sendReplyToClient函数
小结
本文所引用的源码全部来自Redis2.8.2版本。
Redis源码整体运行流程的相关文件是:redis.h, redis.c, networking.c, ae.h, ae.c。
转载请注明,本文出自:/article/7708318.html
Redis Server端处理Client请求的流程图
main函数
main函数主要的功能为:调用initServerConfig函数,进行默认的redisServer数据结构的参数初始化;调用daemonize函数,为服务器开始守护进程,对于守护进行相关详细信息见/article/7708299.html;调用initServer函数,初始化服务器;调用loadServerConfig函数,读取Redis的配置文件,使用配置文件中的参数替换默认的参数值;调用aeMain函数,开启事件循环,整个服务器开始工作。initServer函数
该函数主要为初始化服务器,需要初始化的内容比较多,主要有:1、创建事件循环
[cpp] view
plaincopyprint?
<span style="font-family:Courier New;font-size:14px;">server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);</span>
2、创建TCP与UDP Server,启动服务器,完成bind与listen
[cpp] view
plaincopyprint?
<span style="font-family:Courier New;font-size:14px;">/* Open the TCP listening socket for the user commands. */
//server.ipfd是个int数组,启动服务器,完成bind,listen
if (listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm);
if (server.sofd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
}</span>
Redis2.8.2 TCP同时支持IPv4与IPv6,同时与之前版本的Redis不同,此版本支持多个TCP服务器,listenToPort函数主要还是调用anetTcpServer函数,完成socket()-->bind()-->listen(),下面详细查看下TCPServer的创建,UDP直接忽略吧,我也不知道UDP具体用在哪。
[cpp] view
plaincopyprint?
<span style="font-family:Courier New;">static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) {
//绑定bind
if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}
/* Use a backlog of 512 entries. We pass 511 to the listen() call because
* the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
* which will thus give us a backlog of 512 entries */
//监听
if (listen(s, 511) == -1) {
anetSetError(err, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}
static int _anetTcpServer(char *err, int port, char *bindaddr, int af)
{
int s, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
snprintf(_port,6,"%d",port);
memset(&hints,0,sizeof(hints));
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
//套接字地址用于监听绑定
hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */
//可以加上hints.ai_protocol = IPPROTO_TCP;
/**getaddrinfo(const char *hostname, const char *servicename,
const struct addrinfo *hint,struct addrinfo **res);
hostname:主机名
servicename: 服务名
hint: 用于过滤的模板,仅能使用ai_family, ai_flags, ai_protocol, ai_socktype,其余字段为0
res:得到所有可用的地址
*/
if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
//轮流尝试多个地址,找到一个允许连接到服务器的地址时便停止
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
//设置套接字选项setsockopt,采用地址复用
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
//bind, listen
if (anetListen(err,s,p->ai_addr,p->ai_addrlen) == ANET_ERR) goto error;
goto end;
}
if (p == NULL) {
anetSetError(err, "unable to bind socket");
goto error;
}
error:
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
}
//if server.ipfd_count = 0, bindaddr = NULL
int anetTcpServer(char *err, int port, char *bindaddr)
{
return _anetTcpServer(err, port, bindaddr, AF_INET);
}
</span>
3、将listen的端口加入到事件监听中,进行监听,由aeCreateFileEvent函数完成,其注册的listen端口可读事件处理函数为acceptTcpHandler,这样在listen端口有新连接的时候会调用acceptTcpHandler,后者在accept这个新连接,然后就可以处理后续跟这个客户端连接相关的事件了。
[cpp] view
plaincopyprint?
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
//文件事件,用于处理响应外界的操作请求,事件处理函数为acceptTcpHandler/acceptUnixHandler
//在networking.c
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
acceptTcpHandler函数
上面介绍了,initServer完成listen端口后,会加入到事件循环中,该事件为可读事件,并记录处理函数为fe->rfileProc = acceptTcpHandler;该函数分两步操作:用acceptTcpHandler接受这个客户端连接;然第二部初始化这个客户端连接的相关数据,将clientfd加入事件里面,设置的可读事件处理函数为readQueryFromClient,也就是读取客户端请求的函数。[cpp] view
plaincopyprint?
<span style="font-family:Courier New;">void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);//无意义
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
//cfd为accept函数返回的客户端文件描述符,accept使服务器完成一个客户端的链接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
//将cfd加入事件循环并设置回调函数为readQueryFromClient,并初始化redisClient
acceptCommonHandler(cfd,0);
}</span>
第一步很简单即完成accept,主要关注第二步acceptCommonHandler函数
[cpp] view
plaincopyprint?
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
if ((c = createClient(fd)) == NULL) {//创建新的客户端
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in non-blocking
* mode and we can send an error for free using the Kernel I/O */
//当前连接的客户端数目大于服务器最大运行的连接数,则拒绝连接
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
server.stat_numconnections++;
c->flags |= flags;
}
createClient函数
此函数用来为新连接的客户端初始化一个redisClient数据结构,该数据结构有比较多的参数,详见redis.h。该函数完成两个操作,第一、为客户端创建事件处理函数readQueryFromClient专门接收客户端发来的指令,第二、初始化redisClient数据结构相关参数。[cpp] view
plaincopyprint?
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
/**
因为 Redis 命令总在客户端的上下文中执行,
有时候为了在服务器内部执行命令,需要使用伪客户端来执行命令
在 fd == -1 时,创建的客户端为伪终端
*/
if (fd != -1) {
//下面三个都是设置socket属性
anetNonBlock(NULL,fd);//非阻塞
anetEnableTcpNoDelay(NULL,fd);//no delay
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);//keep alive
//创建一个accept fd的FileEvent事件,事件的处理函数是readQueryFromClient
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
selectDb(c,0);//默认选择第0个db, db.c
c->fd = fd;//文件描述符
c->name = NULL;
c->bufpos = 0;//将指令结果发送给客户端的字符串长度
c->querybuf = sdsempty();//请求字符串初始化
c->querybuf_peak = 0;//请求字符串顶峰时的长度值
c->reqtype = 0;//请求类型
c->argc = 0;//参数个数
c->argv = NULL;//参数内容
c->cmd = c->lastcmd = NULL;//操作指令
c->multibulklen = 0;//块个数
c->bulklen = -1;//每个块的长度
c->sentlen = 0;
c->flags = 0;//客户类型的标记,比较重要
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
c->reply = listCreate();//存放服务器应答的数据
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,decrRefCountVoid);
listSetDupMethod(c->reply,dupClientReplyValue);
c->bpop.keys = dictCreate(&setDictType,NULL);//下面三个参数在list数据阻塞操作时使用
c->bpop.timeout = 0;
c->bpop.target = NULL;
c->io_keys = listCreate();
c->watched_keys = listCreate();//事务命令CAS中使用
listSetFreeMethod(c->io_keys,decrRefCountVoid);
c->pubsub_channels = dictCreate(&setDictType,NULL);
c->pubsub_patterns = listCreate();
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
// 如果不是伪客户端,那么将客户端加入到服务器客户端列表中
if (fd != -1) listAddNodeTail(server.clients,c);//添加到server的clients链表
initClientMultiState(c);//初始化事务指令状态
return c;
}
客户端的请求指令字符串始终存放在querybuf中,在对querybuf解析后,将指令参数的个数存放在argc中,具体的指令参数存放在argv中;但是服务器应答的结果有两种存储方式:buf字符串、reply列表。
readQueryFromClient函数
readQueryFromClient函数用来读取客户端的请求命令行数据,并调用processInputBuffer函数依照redis通讯协议对数据进行解析。服务器使用最原始的read函数来读取客户端发送来的请求命令,并将字符串存储在querybuf中,根据需要对querybuf进行扩展。[cpp] view
plaincopyprint?
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
int nread, readlen;
size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
server.current_client = c;
readlen = REDIS_IOBUF_LEN; //1024 * 16
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= REDIS_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
//对querybuf的空间进行扩展
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//读取客户端发来的操作指令
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
//改变querybuf的实际长度和空闲长度,len += nread, free -= nread;
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
server.current_client = NULL;
return;
}
//客户端请求的字符串长度大于服务器最大的请求长度值
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = getClientInfoString(c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
//解析请求
processInputBuffer(c);
server.current_client = NULL;
}
processInputBuffer函数主要用来处理请求的解析工作,redis有两种解析方式;行指令解析与多重指令解析,行指令解析直接忽略,下面详解多重指令解析。
[cpp] view
plaincopyprint?
void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & REDIS_BLOCKED) return;
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
/* Determine request type when unknown. */
//当请求类型未知时,先确定属于哪种请求
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = REDIS_REQ_MULTIBULK;//多重指令解析
} else {
c->reqtype = REDIS_REQ_INLINE;//按行解析
}
}
if (c->reqtype == REDIS_REQ_INLINE) {
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
} else {
redisPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
//执行相应指令
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}
多重指令解析的处理函数为processMultibulkBuffer,下面先简单介绍下Redis的通讯协议:
[plain] view
plaincopyprint?
以下是这个协议的一般形式:
*< 参数数量 > CR LF
$< 参数 1 的字节数量 > CR LF
< 参数 1 的数据 > CR LF
...
$< 参数 N 的字节数量 > CR LF
< 参数 N 的数据 > CR LF
举个例子,以下是一个命令协议的打印版本:
*3
$3
SET
$3
foo
$3
bar
这个命令的实际协议值如下:
"*3\r\n$3\r\nSET\r\n$3\r\foo\r\n$3\r\bar\r\n"
[cpp] view
plaincopyprint?
/**
例:querybuf = "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
*/
int processMultibulkBuffer(redisClient *c) {
char *newline = NULL;
int pos = 0, ok;
long long ll;
if (c->multibulklen == 0) {//参数数目为0,表示这是新的请求指令
/* The client should have been reset */
redisAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string");
setProtocolError(c,0);
}
return REDIS_ERR;
}
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
return REDIS_ERR;
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
//将字符串转为long long整数,转换得到的结果存到ll中,ll就是后面参数的个数
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length");
setProtocolError(c,pos);
return REDIS_ERR;
}
pos = (newline-c->querybuf)+2;//跳过\r\n
if (ll <= 0) {//参数个数小于0,表示后面的参数数目大于等于绝对值ll
/** s = sdsnew("Hello World");
* sdsrange(s,1,-1); => "ello World"
*/
sdsrange(c->querybuf,pos,-1);//querybuf="$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
return REDIS_OK;
}
c->multibulklen = ll;//得到指令参数个数
/* Setup argv array on client structure */
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*) * c->multibulklen);//申请参数内存空间
}
redisAssertWithInfo(c,NULL,c->multibulklen > 0);
/**
开始抽取字符串
querybuf = "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
pos = 4
*/
while(c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {//参数的长度为-1,这里用来处理每个参数的字符串长度值
/**newline = "\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"*/
newline = strchr(c->querybuf+pos,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big bulk count string");
setProtocolError(c,0);
}
break;
}
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
break;
//每个字符串以$开头,后面的数字表示其长度
if (c->querybuf[pos] != '$') {
addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'",
c->querybuf[pos]);
setProtocolError(c,pos);
return REDIS_ERR;
}
//得到字符串的长度值,ll
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
if (!ok || ll < 0 || ll > 512*1024*1024) {
addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError(c,pos);
return REDIS_ERR;
}
//pos = 8
pos += newline-(c->querybuf+pos)+2;//跳过\r\n "SET\r\n$3\r\nfoo\r\n$3\r\nbar\r"
if (ll >= REDIS_MBULK_BIG_ARG) {//字符串长度超过1024*32,需要扩展
size_t qblen;
/* If we are going to read a large object from network
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data. */
/**
sdsrange(querybuf,pos,-1)是将[pos,len-1]之间的字符串使用memmove前移,
然后后面的直接截断
*/
sdsrange(c->querybuf,pos,-1);//"SET\r\n$3\r\nfoo\r\n$3\r\nbar\r"
pos = 0;
qblen = sdslen(c->querybuf);
/* Hint the sds library about the amount of bytes this string is
* going to contain. */
if (qblen < ll+2)//这里只会到最后一个字符串才可能为True,并且数据不完整,数据不完整是由于redis使用非阻塞的原因
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
}
c->bulklen = ll;
}
/* Read bulk argument */
//读取参数,没有\r\n表示数据不全,也就是说服务器接收到的数据不完整
if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
/* Not enough data (+2 == trailing \r\n) */
break;
} else {//数据完整
/* Optimization: if the buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
if (pos == 0 &&
c->bulklen >= REDIS_MBULK_BIG_ARG &&
(signed) sdslen(c->querybuf) == c->bulklen+2)
{//数据刚好完整,那么就直接使用c->querybuf,然后清空querybuf,注意这里只可能在最后一个字符串才可能出现
c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
sdsIncrLen(c->querybuf,-2); /* remove CRLF */
c->querybuf = sdsempty();
/* Assume that if we saw a fat argument we'll see another one
* likely... */
c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
pos = 0;
} else {
//抽取出具体的字符串,比如SET,建立一个stringObject
c->argv[c->argc++] =
createStringObject(c->querybuf+pos,c->bulklen);
pos += c->bulklen+2;//跳过\r\n
}
c->bulklen = -1;
c->multibulklen--;
}
}
/**
由于采用的是非阻塞读取客户端数据的方式,那么如果c->multibulklen != 0,那么就表示
数据没有接收完全,首先需要将当前的querybuf数据截断
*/
/* Trim to pos */
if (pos) sdsrange(c->querybuf,pos,-1);
/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return REDIS_OK;
/* Still not read to process the command */
return REDIS_ERR;
}
processCommand与call函数
客户端指令解析完之后,需要执行该指令,执行指令的两个函数为processCommand与call函数,这两个函数除了单纯的执行指令外,还做了许多其他的工作,这里不详解,看代码仅仅找到指令如何执行还是很简单的。指令执行完之后,需要将得到的结果集返回给客户端,这部分是如何工作的,下面开始分析。
在networking.c中可以发现许多以addRelpy为前缀的函数名,这些函数都是用来处理各种不同类型的结果的,我们以典型的addReply函数为例,进行分析。
addReply函数
该函数第一步工作就是调用prepareClientToWrite函数为客户端创建一个写文件事件,事件的处理函数即将结果集发送给客户端的函数为sendReplyToClient.
[cpp] view
plaincopyprint?
int prepareClientToWrite(redisClient *c) {
if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
if ((c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
if (c->fd <= 0) return REDIS_ERR; /* Fake client */
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}
第二步,就是根据相应的条件,将得到的结果rboj数据存储到buf中或者reply链表中。对于存储的策略:redis优先将数据存储在固定大小的buf中,也就是redisClient结构体buf[REDIS_REPLY_CHUNK_BYTES]里,默认大小为16K。如果有数据没有发送完或c->buf空间不足,就会放到c->reply链表里面,链表每个节点都是内存buf,后来的数据放入最后面。具体的处理函数为_addReplyToBuffer和_addReplyStringToList两个函数。
[cpp] view
plaincopyprint?
void addReply(redisClient *c, robj *obj) {
if (prepareClientToWrite(c) != REDIS_OK) return;
/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
* refcount field of the object if it's not needed.
*
* If the encoding is RAW and there is room in the static buffer
* we'll be able to send the object to the client without
* messing with its page. */
if (obj->encoding == REDIS_ENCODING_RAW) {//字符串类型
//是否能将数据追加到c->buf中
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);//添加到c->reply链表中
} else if (obj->encoding == REDIS_ENCODING_INT) {//整数类型
/* Optimization: if there is room in the static buffer for 32 bytes
* (more than the max chars a 64 bit integer can take as string) we
* avoid decoding the object and go for the lower level approach. */
//追加到c->buf中
if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
char buf[32];
int len;
len = ll2string(buf,sizeof(buf),(long)obj->ptr);//整型转string
if (_addReplyToBuffer(c,buf,len) == REDIS_OK)
return;
/* else... continue with the normal code path, but should never
* happen actually since we verified there is room. */
}
obj = getDecodedObject(obj);//64位整数,先转换为字符串
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
decrRefCount(obj);
} else {
redisPanic("Wrong obj->encoding in addReply()");
}
}
[cpp] view
plaincopyprint?
/**
Server将数据发送给Client,有两种存储数据的缓冲形式,具体参见redisClient结构体
1、Response buffer
int bufpos; //回复
char buf[REDIS_REPLY_CHUNK_BYTES]; //长度为16 * 1024
2、list *reply;
unsigned long reply_bytes; Tot bytes of objects in reply list
int sentlen; 已发送的字节数
如果已经使用reply的形式或者buf已经不够存储,那么就将数据添加到list *reply中
否则将数据添加到buf中
*/
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;//计算出c->buf的剩余长度
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > 0) return REDIS_ERR;
/* Check that the buffer has enough space available for this string. */
if (len > available) return REDIS_ERR;
//回复数据追加到buf中
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return REDIS_OK;
}
/**
1、如果链表长度为0: 新建一个节点并直接将robj追加到链表的尾部
2、链表长度不为0: 首先取出链表的尾部节点
1)、尾部节点的字符串长度 + robj中ptr字符串的长度 <= REDIS_REPLY_CHUNK_BYTES:
将robj->ptr追加到尾节点的tail->ptr后面
2)、反之: 新建一个节点并直接将robj追加到链表的尾部
*/
void _addReplyObjectToList(redisClient *c, robj *o) {
robj *tail;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
//链表长度为0
if (listLength(c->reply) == 0) {
incrRefCount(o);//增加引用次数
listAddNodeTail(c->reply,o);//添加到链表末尾
c->reply_bytes += zmalloc_size_sds(o->ptr); //计算o->ptr的占用内存大小
} else {
//取出链表尾中的数据
tail = listNodeValue(listLast(c->reply));
/* Append to this object when possible. */
// 如果最后一个节点所保存的回复加上新回复内容总长度小于等于 REDIS_REPLY_CHUNK_BYTES
// 那么将新回复追加到节点回复当中。
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
{
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else {//为新回复单独创建一个节点
incrRefCount(o);
listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
}
}
// 如果突破了客户端的最大缓存限制,那么关闭客户端
asyncCloseClientOnOutputBufferLimitReached(c);
}
sendReplyToClient函数
终于到了最后一步,把c->buf与c->reply中的数据发送给客户端即可,发送同样使用的是最原始的write函数。发送完成之后,redis将当前客户端释放,并且删除写事件,代码比较简单,不详细解释。小结
本文粗略的介绍了Redis整体运行的流程,从服务器的角度,介绍Redis是如何初始化,创建socket,接收客户端请求,解析请求及指令的执行,反馈执行的结果集给客户端等。如果读者想更深入的了解Redis的运行机制,需要亲自阅读源码,本文可以用作参考。同时也是学习linux socket编程的好工具,原本简简单单的socket->bind->listen->accept->read->write也可以用来做许多高效的业务,是Linuxsocket学习的不二选择。
相关文章推荐
- Redis源码整体运行流程详解
- Redis源码学习笔记(一)-- 整体运行流程详解
- Redis运行流程源码解析
- Redis运行流程源码解析--转载
- Redis运行流程源码解析
- Redis运行流程源码解析
- Redis运行流程源码解析
- Redis运行流程源码解析 .
- OkHttp3源码详解(二整体流程)
- redis 持久化详解,RDB和AOF是什么?他们优缺点是什么?运行流程是什么?
- 详解thinkphp的C函数源码以及运行流程和使用注意
- Redis运行流程源码解析
- Redis运行流程源码解析
- Redis运行流程源码解析
- Redis运行流程源码解析
- Redis运行流程源码解析--转载
- Redis运行流程源码解析
- 第六章 注解式控制器详解——注解式控制器运行流程及处理器定义
- LwIP 协议栈源码详解 ——TCP/IP 协议的实现(十:ARP 层流程)
- 用Quickfix详解Fix(二)--运行源码