Redis教程–事务机制的源码分析
2018-03-03 15:55
337 查看
摘要:Redis支持简单的事务性,可以通过MULTI、DISCARD、EXEC、WATCH以及UNWATCH来实现许多事务场景,在《Redis事务机制–基本介绍》一文中我介绍了Redis事务的基本用法,本文将结合源码分析Redis事务的具体实现原理。如有错误,欢迎指正。
其中和本文相关的有三个属性,flags用于标记客户端的状态值,其中如下状态标记该客户端处于事务状态:
cmd及lastcmd属性用于描述当前及上次执行的命令情况,结构入下:
在这个结构体当中的proc属性用于指向该命令的执行函数。
另一个我们需要了解的结构体multiState位于multi.c中,用于描述命令序列的信息,源码如下:
multiState用于描述事务中命令集合的相关信息,包括multiCmd(命令序列)、count(命令数量)等。
可以看到服务器首先检查客户端状态是否是CLIENT_MULTI,如果不是,则会提示错误,这也就解释为什么MULTI命令不能在事务中执行。之前将客户端的flags增加CLIENT_MULTI状态并且返回ok,如下图。所以开启事务是对整个连接生效的。
![](https://img-blog.csdn.net/20180303155409122?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvZHVvZHVveGlvbmcyMDEx/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
那么,事务的关闭是如何出发的呢?正常情况下手工关闭事务是通过执行DISCARD命令来完成的,该命令所在文件位于multi.c,源码如下:
这段代码和上面介绍的MULTI源码是不是很像,首先检查客户端状态是否是CLIENT_MULTI,如果不是,则会提示错误,DISCARD命令必须要在事务中执行。检查完毕之后,通过discardTransaction函数结束事务。
![](https://img-blog.csdn.net/20180303155356667?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvZHVvZHVveGlvbmcyMDEx/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
为什么这么做?我们从源码一探究竟。discardTransaction及其调用函数的代码都位于multi.c,如下(其中unwatchAllKeys我们在后面小节中分析):
原来,关闭事务比开启事务要多做一些事情:释放内存(必须的,否则会导致内存使用效率不高甚至内存溢出),重置变量引用。所以将其放置在一个函数中方便调用,程序也更美观。
在处理命令的过程中,如果客户端处于事务状态,并且该命令不是EXEC、DISCARD、MULTI及WATCH,则会调用queueMultiCommand将命令入队,并且返回QUEUED。否则调用call执行命令,记录master节点的offset值(关于redis主从复制原理性的东西,会单独一篇文章来讲解),唤醒有新值的链表(在调用blpop时如果链表没有值,则会阻塞,ready_keys记录了这些key)。从源码可以看出queueMultiCommand是入队函数,我们来看下它的实现过程。
首先扩大mstate.commands的内存,大小是sizeof(multiCmd)*(c->mstate.count+1),将被增加的那一块内存地址赋予mc指针,这个时候就已经将新的命令入队了。但是还没有赋值,后面通过mc指针对cmd(命令)、argc、argv(参数)进行复制,并递增统计变量。
![](https://img-blog.csdn.net/20180303155336337?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvZHVvZHVveGlvbmcyMDEx/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
执行该命令首先通过c->flags & CLIENT_MULTI代码检查事务是否开启,因为EXEC必须在开启事务后执行。然后在判断客户端是否处于CLIENT_DIRTY_CAS和CLIENT_DIRTY_EXEC状态,CLIENT_DIRTY_CAS表示被watch的值被修改了,而CLIENT_DIRTY_EXEC表示在入队的时候有错误抛出,在这两种情况下,将会调用iscardTransaction关闭事务并且将命令发送到其他从节点的monitor里(如果有)。再此之后通过unwatchAllKeys函数卸载所有到监视器。
接下来通过一个for循环for (j = 0; j < c->mstate.count; j++)遍历队列里的所有命令,初始化参数,并且执行,注意:这些命令是顺序执行的,由于redis是单线程单,所以这些命令单执行是原子的。另外,如果有从节点,那么MULTI会追加到AOF文件和发送到从节点去执行,。
队列中所有命令执行完之后,将会调用discardTransaction关闭事务。这里有个异常,如果在实行命令集的过程中发生了master节点编程salve节点的情况,这样,EXEC命令将不能同步到其他节点,必须通过feedReplicationBacklog命令将EXEC命令写入backlog同步到其他节点,保证一致性。
通过源码,可以知道,redis针对key的watch生成了两种数据结构,一种是每一个client内有被监控的key列表watched_keys,另外一种是一个字典,字典的key是被监视的key名,value是一个包含有所有监控该key的client。
![](https://img-blog.csdn.net/20180303155307798?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvZHVvZHVveGlvbmcyMDEx/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
以上两个函数均是对两类数据结构对维护。在上一节中我们通过源码看到,EXEC时会判断客户端是否有CLIENT_DIRTY_CAS状态,那么问题来了,如果通过这个数据结构来维护CLIENT_DIRTY_CAS的标记和取消标记呢?我们继续看源码。
通过注释我们可以看到,每次一个key的value被修改的时候,会触发signalModifiedKey函数的执行,而在其调用的touchWatchedKey函数中会检索出监视该key的所有的客户端列表,并将该客户端的flags标记为CLIENT_DIRTY_CAS。
前续
Redis源码是用C语言编写的,所以阅读本文的朋友需要有基本的C语言基础。Redis相关的源码可以在redis根目录中的src文件夹中找到。基本数据结构介绍
结构体client,位于头文件server.h,用于描述每一个连接到服务到客户端详情,源码如下:/* With multiplexing we need to take per-client state. * Clients are taken in a linked list. */ typedef struct client { uint64_t id; /* Client incremental unique ID. */ int fd; /* Client socket. */ redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ sds pending_querybuf; /* If this is a master, this buffer represents the yet not applied replication stream that we are receiving from the master. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ int argc; /* Num of arguments of current command. */ robj **argv; /* Arguments of current command. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ int reqtype; /* Request protocol type: PROTO_REQ_* */ int multibulklen; /* Number of multi bulk arguments left to read. */ long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ time_t ctime; /* Client creation time. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; int flags; /* Client flags: CLIENT_* macros. */ int authenticated; /* When requirepass is non-NULL. */ int replstate; /* Replication state if this is a slave. */ int repl_put_online_on_ack; /* Install slave write handler on ACK. */ int repldbfd; /* Replication DB file descriptor. */ off_t repldboff; /* Replication DB file offset. */ off_t repldbsize; /* Replication DB file size. */ sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a 10708 master. */ long long reploff; /* Applied replication offset if this is a master. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: SLAVECONF listening-port */ char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ multiState mstate; /* MULTI/EXEC state */ int btype; /* Type of blocking op if CLIENT_BLOCKED. */ blockingState bpop; /* blocking state */ long long woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ sds peerid; /* Cached peer ID. */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; } client;
其中和本文相关的有三个属性,flags用于标记客户端的状态值,其中如下状态标记该客户端处于事务状态:
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
cmd及lastcmd属性用于描述当前及上次执行的命令情况,结构入下:
struct redisCommand { char *name; redisCommandProc *proc; int arity; char *sflags; /* Flags as string representation, one char per flag. */ int flags; /* The actual flags, obtained from the 'sflags' field. */ /* Use a function to determine keys arguments in a command line. * Used for Redis Cluster redirect. */ redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ int lastkey; /* The last argument that's a key */ int keystep; /* The step between first and last key */ long long microseconds, calls; };
在这个结构体当中的proc属性用于指向该命令的执行函数。
typedef void redisCommandProc(client *c);
另一个我们需要了解的结构体multiState位于multi.c中,用于描述命令序列的信息,源码如下:
typedef struct multiCmd { robj **argv; int argc; struct redisCommand *cmd; } multiCmd; typedef struct multiState { multiCmd *commands; /* Array of MULTI commands */ int count; /* Total number of MULTI commands */ int minreplicas; /* MINREPLICAS for synchronous replication */ time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ } multiState;
multiState用于描述事务中命令集合的相关信息,包括multiCmd(命令序列)、count(命令数量)等。
事务的开启和关闭
通过执行MULTI命令可以开启一个事务,该命令所在的文件位于multi.c,源码如下:void multiCommand(client *c) { if (c->flags & CLIENT_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; } c->flags |= CLIENT_MULTI; addReply(c,shared.ok); }
可以看到服务器首先检查客户端状态是否是CLIENT_MULTI,如果不是,则会提示错误,这也就解释为什么MULTI命令不能在事务中执行。之前将客户端的flags增加CLIENT_MULTI状态并且返回ok,如下图。所以开启事务是对整个连接生效的。
那么,事务的关闭是如何出发的呢?正常情况下手工关闭事务是通过执行DISCARD命令来完成的,该命令所在文件位于multi.c,源码如下:
void discardCommand(client *c) { if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"DISCARD without MULTI"); return; } discardTransaction(c); addReply(c,shared.ok); }
这段代码和上面介绍的MULTI源码是不是很像,首先检查客户端状态是否是CLIENT_MULTI,如果不是,则会提示错误,DISCARD命令必须要在事务中执行。检查完毕之后,通过discardTransaction函数结束事务。
为什么这么做?我们从源码一探究竟。discardTransaction及其调用函数的代码都位于multi.c,如下(其中unwatchAllKeys我们在后面小节中分析):
void discardTransaction(client *c) { freeClientMultiState(c); initClientMultiState(c); c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); unwatchAllKeys(c); }
/* Release all the resources associated with MULTI/EXEC state */ void freeClientMultiState(client *c) { int j; for (j = 0; j < c->mstate.count; j++) { int i; multiCmd *mc = c->mstate.commands+j; for (i = 0; i < mc->argc; i++) decrRefCount(mc->argv[i]); zfree(mc->argv); } zfree(c->mstate.commands); }
/* Client state initialization for MULTI/EXEC */ void initClientMultiState(client *c) { c->mstate.commands = NULL; c->mstate.count = 0; }
原来,关闭事务比开启事务要多做一些事情:释放内存(必须的,否则会导致内存使用效率不高甚至内存溢出),重置变量引用。所以将其放置在一个函数中方便调用,程序也更美观。
命令入队原理解析
在开启事务后,所执行的命令将会被序列化并放置于队列中,命令的语法错误将导致队列被清空,事务结束,那么,redis的入对过程是怎样的呢?下面摘选了redis命令执行代码有关事务处理的一段。...... /* Exec the command */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } ......
在处理命令的过程中,如果客户端处于事务状态,并且该命令不是EXEC、DISCARD、MULTI及WATCH,则会调用queueMultiCommand将命令入队,并且返回QUEUED。否则调用call执行命令,记录master节点的offset值(关于redis主从复制原理性的东西,会单独一篇文章来讲解),唤醒有新值的链表(在调用blpop时如果链表没有值,则会阻塞,ready_keys记录了这些key)。从源码可以看出queueMultiCommand是入队函数,我们来看下它的实现过程。
/* Add a new command into the MULTI commands queue */ void queueMultiCommand(client *c) { multiCmd *mc; int j; c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count+1)); mc = c->mstate.commands+c->mstate.count; mc->cmd = c->cmd; mc->argc = c->argc; mc->argv = zmalloc(sizeof(robj*)*c->argc); memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); for (j = 0; j < c->argc; j++) incrRefCount(mc->argv[j]); c->mstate.count++; }
首先扩大mstate.commands的内存,大小是sizeof(multiCmd)*(c->mstate.count+1),将被增加的那一块内存地址赋予mc指针,这个时候就已经将新的命令入队了。但是还没有赋值,后面通过mc指针对cmd(命令)、argc、argv(参数)进行复制,并递增统计变量。
执行事务的过程
执行事务是通过EXEC命令来完成的,该命令位于multi.c,源码如下:void execCommand(client *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ int was_master = server.masterhost == NULL; if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; } /* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) { addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr : shared.nullmultibulk); discardTransaction(c); goto handle_monitor; } /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyMultiBulkLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; /* Propagate a MULTI request once we encounter the first command which * is not readonly nor an administrative one. * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { execCommandPropagateMulti(c); must_propagate = 1; } call(c,CMD_CALL_FULL); /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; discardTransaction(c); /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ if (must_propagate) { int is_master = server.masterhost == NULL; server.dirty++; /* If inside the MULTI/EXEC block this instance was suddenly * switched from master to slave (using the SLAVEOF command), the * initial MULTI was propagated into the replication backlog, but the * rest was not. We need to make sure to at least terminate the * backlog with the final EXEC. */ if (server.repl_backlog && was_master && !is_master) { char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; feedReplicationBacklog(execcmd,strlen(execcmd)); } } handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here * since the natural order of commands execution is actually: * MUTLI, EXEC, ... commands inside transaction ... * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command * table, and we do it here with correct ordering. */ if (listLength(server.monitors) && !server.loading) replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); }
执行该命令首先通过c->flags & CLIENT_MULTI代码检查事务是否开启,因为EXEC必须在开启事务后执行。然后在判断客户端是否处于CLIENT_DIRTY_CAS和CLIENT_DIRTY_EXEC状态,CLIENT_DIRTY_CAS表示被watch的值被修改了,而CLIENT_DIRTY_EXEC表示在入队的时候有错误抛出,在这两种情况下,将会调用iscardTransaction关闭事务并且将命令发送到其他从节点的monitor里(如果有)。再此之后通过unwatchAllKeys函数卸载所有到监视器。
接下来通过一个for循环for (j = 0; j < c->mstate.count; j++)遍历队列里的所有命令,初始化参数,并且执行,注意:这些命令是顺序执行的,由于redis是单线程单,所以这些命令单执行是原子的。另外,如果有从节点,那么MULTI会追加到AOF文件和发送到从节点去执行,。
队列中所有命令执行完之后,将会调用discardTransaction关闭事务。这里有个异常,如果在实行命令集的过程中发生了master节点编程salve节点的情况,这样,EXEC命令将不能同步到其他节点,必须通过feedReplicationBacklog命令将EXEC命令写入backlog同步到其他节点,保证一致性。
redis如何实现乐观锁
redis是通过WATCH,UNWATCH命令配合事务来实现redis中的乐观锁,其源码位于multi.c,如下:void watchCommand(client *c) { int j; if (c->flags & CLIENT_MULTI) { addReplyError(c,"WATCH inside MULTI is not allowed"); return; } for (j = 1; j < c->argc; j++) watchForKey(c,c->argv[j]); addReply(c,shared.ok); } void unwatchCommand(client *c) { unwatchAllKeys(c); c->flags &= (~CLIENT_DIRTY_CAS); addReply(c,shared.ok); } /* Watch for the specified key */ void watchForKey(client *c, robj *key) { list *clients = NULL; listIter li; listNode *ln; watchedKey *wk; /* Check if we are already watching for this key */ listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->db == c->db && equalStringObjects(key,wk->key)) return; /* Key already watched */ } /* This key is not already watched in this DB. Let's add it */ clients = dictFetchValue(c->db->watched_keys,key); if (!clients) { clients = listCreate(); dictAdd(c->db->watched_keys,key,clients); incrRefCount(key); } listAddNodeTail(clients,c); /* Add the new key to the list of keys watched by this client */ wk = zmalloc(sizeof(*wk)); wk->key = key; wk->db = c->db; incrRefCount(key); listAddNodeTail(c->watched_keys,wk); } /* Unwatch all the keys watched by this client. To clean the EXEC dirty * flag is up to the caller. */ void unwatchAllKeys(client *c) { listIter li; listNode *ln; if (listLength(c->watched_keys) == 0) return; listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { list *clients; watchedKey *wk; /* Lookup the watched key -> clients list and remove the client * from the list */ wk = listNodeValue(ln); clients = dictFetchValue(wk->db->watched_keys, wk->key); serverAssertWithInfo(c,NULL,clients != NULL); listDelNode(clients,listSearchKey(clients,c)); /* Kill the entry at all if this was the only client */ if (listLength(clients) == 0) dictDelete(wk->db->watched_keys, wk->key); /* Remove this watched key from the client->watched list */ listDelNode(c->watched_keys,ln); decrRefCount(wk->key); zfree(wk); } }
通过源码,可以知道,redis针对key的watch生成了两种数据结构,一种是每一个client内有被监控的key列表watched_keys,另外一种是一个字典,字典的key是被监视的key名,value是一个包含有所有监控该key的client。
以上两个函数均是对两类数据结构对维护。在上一节中我们通过源码看到,EXEC时会判断客户端是否有CLIENT_DIRTY_CAS状态,那么问题来了,如果通过这个数据结构来维护CLIENT_DIRTY_CAS的标记和取消标记呢?我们继续看源码。
/*----------------------------------------------------------------------------- * Hooks for key space changes. * * Every time a key in the database is modified the function * signalModifiedKey() is called. * * Every time a DB is flushed the function signalFlushDb() is called. *----------------------------------------------------------------------------*/ void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key); } /* "Touch" a key, so that if this key is being WATCHed by some client the * next EXEC will fail. */ void touchWatchedKey(redisDb *db, robj *key) { list *clients; listIter li; listNode *ln; if (dictSize(db->watched_keys) == 0) return; clients = dictFetchValue(db->watched_keys, key); if (!clients) return; /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */ /* Check if we are already watching for this key */ listRewind(clients,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags |= CLIENT_DIRTY_CAS; } }
通过注释我们可以看到,每次一个key的value被修改的时候,会触发signalModifiedKey函数的执行,而在其调用的touchWatchedKey函数中会检索出监视该key的所有的客户端列表,并将该客户端的flags标记为CLIENT_DIRTY_CAS。
后续
虽然我们可以通过这几个命令来实现redis事务相关的内容,但是我们实际开发中通常会使用lua脚本来写,因为lua脚本的所有内容都是在一个事务里。本文就到这里,欢迎一起探讨。欢迎关注我的微信公众号
相关文章推荐
- redis源码分析教程之压缩链表ziplist详解
- scrapy-redis 分布式爬取源码分析
- 自定义View系列教程04--Draw源码分析及其实践
- redis源码分析(2)dict哈希词典
- 【redis源码分析】动态字符串--sds
- redis事务实现原理(源码分析)【转】
- redis anet网络通信的源码分析
- Redis 源码分析(一)
- Redis源码分析二、Redis简单动态字符串
- redis源码命令分析--关键字keys命令
- redis源码分析之配置文件
- redis源码分析之事务Transaction(下)
- Redis源码分析(二十二)——CRC循环冗余校验
- redis源码分析 -- 链表结构
- Redis源码简要分析
- Redis源码分析(四)——Redis数据结构-整数集合
- redis 源码分析
- redis源码分析-rehash过程详解
- Redis 源码分析(二) 一个 rehash 也不阻塞的哈希表
- Hadoop架构设计与源码分析视频教程