您的位置:首页 > 数据库 > Redis

Redis 事务机制实现

2014-09-26 11:37 239 查看
事务提供了一种“将多个命令打包,然后一次性、按顺序地执行”的机制,它提供两个保证:
• 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断
• 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行
MULTI,EXEC,DISCARD,WATCH 四个命令是 redis 事务的四个基础命令。其中:
MULTI,告诉 redis 服务器开启一个事务。注意,只是开启,而不是执行
EXEC,告诉 redis 开始执行事务
DISCARD,告诉 redis 取消事务
WATCH,监视某一个键值对,它的作用是在事务执行之前如果监视的键值被修改,事务会被取消。
以下是一个事务的例子,它先以 MULTI 开始一个事务,然后将多个命令入队到事务中,最后由 EXEC 命令触发事务,一并执行事务中的所有命令:
redis> MULTI
OK
redis> SET book-name "Mastering C++ in 21 days"
QUEUED
redis> GET book-name
QUEUED
redis> SADD tag "C++" "Programming" "Mastering Series"
QUEUED
redis> SMEMBERS tag
QUEUED
redis> EXEC
1) OK
2) "Mastering C++ in 21 days"
3) (integer) 3
4) 1) "Mastering Series"
2) "C++"
3) "Programming"
一个事务从开始到执行会经历以下三个阶段:
1. 开始事务。
2. 命令入队。
3. 执行事务。

MULTI(开始事务)

当接受到客户端发送的multi命令后,processCommand函数会解析该命令,并调用multiCommand
1: /*
2:  * MULTI 命令的实现
3:  *
4:  * 打开客户端的 FLAG ,让命令入队到事务队列里
5:  *
6:  * T = O(1)
7:  */
8: void multiCommand(redisClient *c) {
9:     // MULTI 命令不能嵌套
10:     if (c->flags & REDIS_MULTI) {
11:         addReplyError(c,"MULTI calls can not be nested");
12:         return;
13:     }
14:
15:     // 打开事务的 FLAG
16:     // 从此之后,除 DISCARD 和 EXEC 等少数几个命令之外
17:     // 其他所有的命令都会被添加到事务队列里
18:     c->flags |= REDIS_MULTI;
19:     addReply(c,shared.ok);
20: }


此函数将rediClient客户端对象的flags置为REDIS_MULTI,并对客户端返回OK,表示事务机制开始

命令入队

客户端输入名利multi并得到返回后,便可以继续输入要执行的命令了,此时redis在processCommand中会对命令进行处理

1: // 执行命令
2: int processCommand(redisClient *c) {
3:     ......
4:
5:     // 加入命令队列的情况
6:     /* Exec the command */
7:     if (c->flags & REDIS_MULTI &&
8:         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
9:         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
10:     {
11:         // 命令入队
12:         queueMultiCommand(c);
13:         addReply(c,shared.queued);
14:
15:     // 真正执行命令。
16:     // 注意,如果是设置了多命令模式,那么不是直接执行命令,而是让命令入队
17:     } else {
18:         call(c,REDIS_CALL_FULL);
19:         if (listLength(server.ready_keys))
20:             handleClientsBlockedOnLists();
21:     }
22:     return REDIS_OK;
23: }


在此函数中,redis首先检查客户端的flags是否为REDIS_MULTI,并且命令不是exec、discard、multi、watch,如果不是这些命令,便将命令加入到队列中

在redisClient中保存着事务机制所需的命令队列

1: // 命令结构体,命令队列专用
2: /* Client MULTI/EXEC state */
3: typedef struct multiCmd {
4:     // 命令参数
5:     robj **argv;
6:
7:     // 参数个数
8:     int argc;
9:
10:     // 命令结构体,包含了与命令相关的参数,譬如命令执行函数
11:     // 如需更详细了解,参看 redis.c 中的 redisCommandTable 全局参数
12:     struct redisCommand *cmd;
13: } multiCmd;
14:
15: // 命令队列结构体
16: typedef struct multiState {
17:     // 命令队列
18:     multiCmd *commands;     /* Array of MULTI commands */
19:
20:     // 命令的个数
21:     int count;              /* Total number of MULTI commands */
22:
23:     // 以下两个参数暂时没有用到,和主从复制有关
24:     int minreplicas;        /* MINREPLICAS for synchronous replication */
25:     time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
26: } multiState;


执行事务(exec)

当用户发出exec命令后,它在multi之后发送的命令都会被执行,下***体看一下execCommand执行了哪些工作

1: /*
2:  * EXEC 命令的实现
3:  */
4: void execCommand(redisClient *c) {
5:     int j;
6:     // 用于保存执行命令、命令的参数和参数数量的副本
7:     robj **orig_argv;
8:     int orig_argc;
9:     struct redisCommand *orig_cmd;
10:
11:     // 只能在 MULTI 已启用的情况下执行
12:     if (!(c->flags & REDIS_MULTI)) {
13:         addReplyError(c,"EXEC without MULTI");
14:         return;
15:     }
16:
17:     /* Check if we need to abort the EXEC because:
18:      * 以下情况发生时,取消事务
19:      *
20:      * 1) Some WATCHed key was touched.
21:      *    某些被监视的键已被修改(状态为 REDIS_DIRTY_CAS)
22:      *
23:      * 2) There was a previous error while queueing commands.
24:      *    有命令在入队时发生错误(状态为 REDIS_DIRTY_EXEC)
25:      *
26:      * A failed EXEC in the first case returns a multi bulk nil object
27:      * (technically it is not an error but a special behavior), while
28:      * in the second an EXECABORT error is returned.
29:      *
30:      * 第一种情况返回多个空白 NULL 对象,
31:      * 第二种情况返回一个 EXECABORT 错误。
32:      */
33:     if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
34:         // 根据状态,决定返回的错误的类型
35:         addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
36:                                                   shared.nullmultibulk);
37:
38:         // 以下四句可以用 discardTransaction() 来替换
39:         freeClientMultiState(c);
40:         initClientMultiState(c);
41:         c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);
42:         unwatchAllKeys(c);
43:
44:         goto handle_monitor;
45:     }
46:
47:     /* Replicate a MULTI request now that we are sure the block is executed.
48:      * This way we'll deliver the MULTI/..../EXEC block as a whole and
49:      * both the AOF and the replication link will have the same consistency
50:      * and atomicity guarantees. */
51:     // 向所有附属节点和 AOF 文件发送 MULTI 命令
52:     execCommandReplicateMulti(c);
53:
54:     /* Exec all the queued commands */
55:     unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
56:
57:     // 将三个原始参数备份起来
58:     orig_argv = c->argv;
59:     orig_argc = c->argc;
60:     orig_cmd = c->cmd;
61:     addReplyMultiBulkLen(c,c->mstate.count);
62:     // 执行所有入队的命令
63:     for (j = 0; j < c->mstate.count; j++) {
64:         // 因为 call 可能修改命令,而命令需要传送给其他同步节点
65:         // 所以这里将要执行的命令(及其参数)先备份起来
66:         c->argc = c->mstate.commands[j].argc;
67:         c->argv = c->mstate.commands[j].argv;
68:         c->cmd = c->mstate.commands[j].cmd;
69:
70:         // 执行命令
71:         call(c,REDIS_CALL_FULL);
72:
73:         /* Commands may alter argc/argv, restore mstate. */
74:         // 还原原始的参数到队列里
75:         c->mstate.commands[j].argc = c->argc;
76:         c->mstate.commands[j].argv = c->argv;
77:         c->mstate.commands[j].cmd = c->cmd;
78:     }
79:     // 还原三个原始命令
80:     c->argv = orig_argv;
81:     c->argc = orig_argc;
82:     c->cmd = orig_cmd;
83:
84:     // 以下三句也可以用 discardTransaction() 来替换
85:     freeClientMultiState(c);
86:     initClientMultiState(c);
87:     c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);
88:     /* Make sure the EXEC command is always replicated / AOF, since we
89:      * always send the MULTI command (we can't know beforehand if the
90:      * next operations will contain at least a modification to the DB). */
91:     server.dirty++;
92:
93: handle_monitor:
94:     /* Send EXEC to clients waiting data from MONITOR. We do it here
95:      * since the natural order of commands execution is actually:
96:      * MUTLI, EXEC, ... commands inside transaction ...
97:      * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
98:      * table, and we do it here with correct ordering. */
99:     // 向同步节点发送命令
100:     if (listLength(server.monitors) && !server.loading)
101:         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
102: }
103:


1.execCommand函数检查客户端对象的flags是否为REDIS_MULTI,即检查是否处于事务机制,如果不是则返回错误

2.同样检查flags标志是否被设置为REDIS_DIRTY_CAS或者REDIS_DIRTY_EXEC,REDIS_DIRTY_CAS标志之后在WATCH命令中讲解,而REDIS_DIRTY_EXEC标志表示入队的命令出现了错误,导致执行命令时出现错误。如果以上两个标志被设置,就会调用discardTransaction函数清空命令队列,取消客户端对象已经设置的事务机制相关标志

3.调用unwatchAllKeys取消对键的监视

4.循环执行命令队列中的命令

此函数中用到的REDIS_DIRTY_EXEC标志,是在开启事务机制并且受到客户端命令时调用函数processCommand中检查并设置的

1:  // 获取要执行的命令,
2:     // 并对命令、命令参数和命令参数的数量进行检查
3:     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
4:     if (!c->cmd) {
5:         // 命令没找,出错
6:         flagTransaction(c);//此处设置REDIS_DIRTY_EXEC标志
7:         addReplyErrorFormat(c,"unknown command '%s'",
8:             (char*)c->argv[0]->ptr);
9:         return REDIS_OK;
10:     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
11:                (c->argc < -c->cmd->arity)) {
12:         // 命令参数个数出错
13:         flagTransaction(c);
14:         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
15:             c->cmd->name);
16:         return REDIS_OK;
17:     }


键值的监视(watch)

watch命令用于在事务机制开始之前对键值进行监视,如果在执行exec命令之前,键值被其他客户端修改了,那么exec就不会执行,并会直接返回失败

WATCH 命令是为了让 redis 拥有 check-and-set(CAS)的特性。CAS 的意思是,一个客户端在修改某个值之前,要检测它是否更改;如果没有更改,修改操作才能成功

当执行exec命令时,不管事务是否成功,都会取消对所有键值的监视

watch命令的实现

每一个代表数据结构的redisDB结构体中都会有一个watched_keys字典,这个字典的键是该数据库中被监视的键,而值是一个链表,保存了所有监视该键的客户端

如下图所示:





而且每一个客户端对象redisClient中会维持一个链表,链表中的元素为watchedKEY结构体,结构体定义如下:

1: /* In the client->watched_keys list we need to use watchedKey structures
2:  * as in order to identify a key in Redis we need both the key name and the
3:  * DB */
4: /*
5:  * 被监视的 key 的资料
6:  */
7: typedef struct watchedKey {
8:     // 被监视的 key
9:     robj *key;
10:     // key 所在的数据库
11:     redisDb *db;
12: } watchedKey;


这个结构体保存了此客户端监视的键的信息

下***体分析,当收到watch命令后会执行的watchCommand函数

1: /*
2:  * 将所有输入键添加到监视列表当中
3:  *
4:  * T = O(N^2)
5:  */
6: void watchCommand(redisClient *c) {
7:     int j;
8:
9:     // 不能在事务中使用
10:     if (c->flags & REDIS_MULTI) {
11:         addReplyError(c,"WATCH inside MULTI is not allowed");
12:         return;
13:     }
14:
15:     // 监视所有 key ,O(N^2)
16:     for (j = 1; j < c->argc; j++)
17:         // O(N)
18:         watchForKey(c,c->argv[j]);
19:
20:     addReply(c,shared.ok);
21: }


此函数首先检查客户端是否被设置了REDIS_MULTI标志,如果没有说明此客户端并没有执行事务,返回错误

之后调用watchForKey函数监视所有键值

1: /* Watch for the specified key */
2: /*
3:  * 监视给定 key
4:  *
5:  * T = O(N)
6:  */
7: void watchForKey(redisClient *c, robj *key) {
8:     list *clients = NULL;
9:     listIter li;
10:     listNode *ln;
11:     watchedKey *wk;
12:
13:     /* Check if we are already watching for this key */
14:     // 检查该 key 是否已经被 WATCH
15:     // (出现在 WATCH 命令调用时一个 key 被输入多次的情况)
16:     // 如果是的话,直接返回
17:     // O(N)
18:     listRewind(c->watched_keys,&li);
19:     while((ln = listNext(&li))) {
20:         wk = listNodeValue(ln);
21:         if (wk->db == c->db && equalStringObjects(key,wk->key))
22:             return; /* Key already watched */
23:     }
24:
25:     // key 未被监视
26:     // 根据 key ,将客户端加入到 DB 的监视 key 字典中
27:     /* This key is not already watched in this DB. Let's add it */
28:     // O(1)
29:     clients = dictFetchValue(c->db->watched_keys,key);
30:     if (!clients) {
31:         clients = listCreate();
32:         dictAdd(c->db->watched_keys,key,clients);
33:         incrRefCount(key);
34:     }
35:     listAddNodeTail(clients,c);
36:
37:     // 将 key 添加到客户端的监视列表中
38:     /* Add the new key to the lits of keys watched by this client */
39:     // O(1)
40:     wk = zmalloc(sizeof(*wk));
41:     wk->key = key;
42:     wk->db = c->db;
43:     incrRefCount(key);
44:     listAddNodeTail(c->watched_keys,wk);
45: }
46:


此函数执行以下工作:

1.在客户端的watched_keys链表中是否已经包含要监视的键,如果有则直接返回

2.如果在客户端的监视链表中没有找到此键,就在数据库的watched_keys字典中查找,如果该键已经存在,就将此客户端结构添加在该键对应的链表末尾

如果没有该键,就将该键添加到此字典中

3.创建一个watchedKEY结构,将此结构添加到客户端的监听链表中

以上便完成了watch命令的初始工作

 

当数据库中执行一个修改操作时,会调用函数signalModifiedKey(),当执行flushDB操作时,会调用signalFlushedDb()。这两个函数都会检查数据库的watched_keys字典中是否有客户端监视这个要修改的key,touchWatchedKey()如果发现被修改的key在watched_keys中,那么对应的客户端们状态都被置为REDIS_DIRTY_CAS,之后执行exec命令会失败

1: void signalModifiedKey(redisDb *db, robj *key) {
2:     touchWatchedKey(db,key);
3: }
4:
5: void signalFlushedDb(int dbid) {
6:     touchWatchedKeysOnFlush(dbid);
7: }


1: /* "Touch" a key, so that if this key is being WATCHed by some client the
2:  * next EXEC will fail. */
3: /*
4:  * “碰触”(touch)给定 key ,如果这个 key 正在被监视的话,
5:  * 让监视它的客户端在执行 EXEC 命令时失败。
6:  *
7:  * T = O(N)
8:  */
9: void touchWatchedKey(redisDb *db, robj *key) {
10:     list *clients;
11:     listIter li;
12:     listNode *ln;
13:
14:     // 如果数据库中没有任何 key 被监视,那么直接返回
15:     if (dictSize(db->watched_keys) == 0) return;
16:
17:     // 取出数据库中所有监视给定 key 的客户端
18:     clients = dictFetchValue(db->watched_keys, key);
19:     if (!clients) return;
20:
21:     /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
22:     /* Check if we are already watching for this key */
23:     // 打开所有监视这个 key 的客户端的 REDIS_DIRTY_CAS 状态
24:     // O(N)
25:     listRewind(clients,&li);
26:     while((ln = listNext(&li))) {
27:         redisClient *c = listNodeValue(ln);
28:
29:         c->flags |= REDIS_DIRTY_CAS;
30:     }
31: }


此函数会将所有监听该key的客户端结构体的flags置为REDIS_DIRTY_CAS,这样在执行exec命令时,如果检查到该标志,就会放弃执行命令
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: