您的位置:首页 > 数据库 > Redis

Redis教程–事务机制的源码分析

2018-03-03 15:55 337 查看
摘要:Redis支持简单的事务性,可以通过MULTI、DISCARD、EXEC、WATCH以及UNWATCH来实现许多事务场景,在《Redis事务机制–基本介绍》一文中我介绍了Redis事务的基本用法,本文将结合源码分析Redis事务的具体实现原理。如有错误,欢迎指正。

前续

Redis源码是用C语言编写的,所以阅读本文的朋友需要有基本的C语言基础。Redis相关的源码可以在redis根目录中的src文件夹中找到。

基本数据结构介绍

结构体client,位于头文件server.h,用于描述每一个连接到服务到客户端详情,源码如下:

/* With multiplexing we need to take per-client state.
* Clients are taken in a linked list. */
typedef struct client {
uint64_t id;            /* Client incremental unique ID. */
int fd;                 /* Client socket. */
redisDb *db;            /* Pointer to currently SELECTed DB. */
robj *name;             /* As set by CLIENT SETNAME. */
sds querybuf;           /* Buffer we use to accumulate client queries. */
sds pending_querybuf;   /* If this is a master, this buffer represents the
yet not applied replication stream that we
are receiving from the master. */
size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */
int argc;               /* Num of arguments of current command. */
robj **argv;            /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
int reqtype;            /* Request protocol type: PROTO_REQ_* */
int multibulklen;       /* Number of multi bulk arguments left to read. */
long bulklen;           /* Length of bulk argument in multi bulk request. */
list *reply;            /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen;         /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime;           /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
int flags;              /* Client flags: CLIENT_* macros. */
int authenticated;      /* When requirepass is non-NULL. */
int replstate;          /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int repldbfd;           /* Replication DB file descriptor. */
off_t repldboff;        /* Replication DB file offset. */
off_t repldbsize;       /* Replication DB file size. */
sds replpreamble;       /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a
10708
master. */
long long reploff;      /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate;      /* MULTI/EXEC state */
int btype;              /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop;     /* blocking state */
long long woff;         /* Last write global replication offset. */
list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
sds peerid;             /* Cached peer ID. */

/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;


其中和本文相关的有三个属性,flags用于标记客户端的状态值,其中如下状态标记该客户端处于事务状态:

#define CLIENT_MULTI (1<<3)   /* This client is in a MULTI context */


cmd及lastcmd属性用于描述当前及上次执行的命令情况,结构入下:

struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
int flags;    /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey;  /* The last argument that's a key */
int keystep;  /* The step between first and last key */
long long microseconds, calls;
};


在这个结构体当中的proc属性用于指向该命令的执行函数。

typedef void redisCommandProc(client *c);


另一个我们需要了解的结构体multiState位于multi.c中,用于描述命令序列的信息,源码如下:

typedef struct multiCmd {
robj **argv;
int argc;
struct redisCommand *cmd;
} multiCmd;

typedef struct multiState {
multiCmd *commands;     /* Array of MULTI commands */
int count;              /* Total number of MULTI commands */
int minreplicas;        /* MINREPLICAS for synchronous replication */
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;


multiState用于描述事务中命令集合的相关信息,包括multiCmd(命令序列)、count(命令数量)等。

事务的开启和关闭

通过执行MULTI命令可以开启一个事务,该命令所在的文件位于multi.c,源码如下:

void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
}


可以看到服务器首先检查客户端状态是否是CLIENT_MULTI,如果不是,则会提示错误,这也就解释为什么MULTI命令不能在事务中执行。之前将客户端的flags增加CLIENT_MULTI状态并且返回ok,如下图。所以开启事务是对整个连接生效的。



那么,事务的关闭是如何出发的呢?正常情况下手工关闭事务是通过执行DISCARD命令来完成的,该命令所在文件位于multi.c,源码如下:

void discardCommand(client *c) {
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"DISCARD without MULTI");
return;
}
discardTransaction(c);
addReply(c,shared.ok);
}


这段代码和上面介绍的MULTI源码是不是很像,首先检查客户端状态是否是CLIENT_MULTI,如果不是,则会提示错误,DISCARD命令必须要在事务中执行。检查完毕之后,通过discardTransaction函数结束事务。



为什么这么做?我们从源码一探究竟。discardTransaction及其调用函数的代码都位于multi.c,如下(其中unwatchAllKeys我们在后面小节中分析):

void discardTransaction(client *c) {
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
unwatchAllKeys(c);
}


/* Release all the resources associated with MULTI/EXEC state */
void freeClientMultiState(client *c) {
int j;

for (j = 0; j < c->mstate.count; j++) {
int i;
multiCmd *mc = c->mstate.commands+j;

for (i = 0; i < mc->argc; i++)
decrRefCount(mc->argv[i]);
zfree(mc->argv);
}
zfree(c->mstate.commands);
}


/* Client state initialization for MULTI/EXEC */
void initClientMultiState(client *c) {
c->mstate.commands = NULL;
c->mstate.count = 0;
}


原来,关闭事务比开启事务要多做一些事情:释放内存(必须的,否则会导致内存使用效率不高甚至内存溢出),重置变量引用。所以将其放置在一个函数中方便调用,程序也更美观。

命令入队原理解析

在开启事务后,所执行的命令将会被序列化并放置于队列中,命令的语法错误将导致队列被清空,事务结束,那么,redis的入对过程是怎样的呢?下面摘选了redis命令执行代码有关事务处理的一段。

......
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
......


在处理命令的过程中,如果客户端处于事务状态,并且该命令不是EXEC、DISCARD、MULTI及WATCH,则会调用queueMultiCommand将命令入队,并且返回QUEUED。否则调用call执行命令,记录master节点的offset值(关于redis主从复制原理性的东西,会单独一篇文章来讲解),唤醒有新值的链表(在调用blpop时如果链表没有值,则会阻塞,ready_keys记录了这些key)。从源码可以看出queueMultiCommand是入队函数,我们来看下它的实现过程。

/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c) {
multiCmd *mc;
int j;

c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc);
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
c->mstate.count++;
}


首先扩大mstate.commands的内存,大小是sizeof(multiCmd)*(c->mstate.count+1),将被增加的那一块内存地址赋予mc指针,这个时候就已经将新的命令入队了。但是还没有赋值,后面通过mc指针对cmd(命令)、argc、argv(参数)进行复制,并递增统计变量。



执行事务的过程

执行事务是通过EXEC命令来完成的,该命令位于multi.c,源码如下:

void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
int was_master = server.masterhost == NULL;

if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}

/* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands.
* A failed EXEC in the first case returns a multi bulk nil object
* (technically it is not an error but a special behavior), while
* in the second an EXECABORT error is returned. */
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
discardTransaction(c);
goto handle_monitor;
}

/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;

/* Propagate a MULTI request once we encounter the first command which
* is not readonly nor an administrative one.
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
* and atomicity guarantees. */
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
execCommandPropagateMulti(c);
must_propagate = 1;
}

call(c,CMD_CALL_FULL);

/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
discardTransaction(c);

/* Make sure the EXEC command will be propagated as well if MULTI
* was already propagated. */
if (must_propagate) {
int is_master = server.masterhost == NULL;
server.dirty++;
/* If inside the MULTI/EXEC block this instance was suddenly
* switched from master to slave (using the SLAVEOF command), the
* initial MULTI was propagated into the replication backlog, but the
* rest was not. We need to make sure to at least terminate the
* backlog with the final EXEC. */
if (server.repl_backlog && was_master && !is_master) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd));
}
}

handle_monitor:
/* Send EXEC to clients waiting data from MONITOR. We do it here
* since the natural order of commands execution is actually:
* MUTLI, EXEC, ... commands inside transaction ...
* Instead EXEC is flagged as CMD_SKIP_MONITOR in the command
* table, and we do it here with correct ordering. */
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}


执行该命令首先通过c->flags & CLIENT_MULTI代码检查事务是否开启,因为EXEC必须在开启事务后执行。然后在判断客户端是否处于CLIENT_DIRTY_CAS和CLIENT_DIRTY_EXEC状态,CLIENT_DIRTY_CAS表示被watch的值被修改了,而CLIENT_DIRTY_EXEC表示在入队的时候有错误抛出,在这两种情况下,将会调用iscardTransaction关闭事务并且将命令发送到其他从节点的monitor里(如果有)。再此之后通过unwatchAllKeys函数卸载所有到监视器。
接下来通过一个for循环for (j = 0; j < c->mstate.count; j++)遍历队列里的所有命令,初始化参数,并且执行,注意:这些命令是顺序执行的,由于redis是单线程单,所以这些命令单执行是原子的。另外,如果有从节点,那么MULTI会追加到AOF文件和发送到从节点去执行,。
队列中所有命令执行完之后,将会调用discardTransaction关闭事务。这里有个异常,如果在实行命令集的过程中发生了master节点编程salve节点的情况,这样,EXEC命令将不能同步到其他节点,必须通过feedReplicationBacklog命令将EXEC命令写入backlog同步到其他节点,保证一致性。

redis如何实现乐观锁

redis是通过WATCH,UNWATCH命令配合事务来实现redis中的乐观锁,其源码位于multi.c,如下:

void watchCommand(client *c) {
int j;

if (c->flags & CLIENT_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}

void unwatchCommand(client *c) {
unwatchAllKeys(c);
c->flags &= (~CLIENT_DIRTY_CAS);
addReply(c,shared.ok);
}

/* Watch for the specified key */
void watchForKey(client *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;

/* Check if we are already watching for this key */
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
return; /* Key already watched */
}
/* This key is not already watched in this DB. Let's add it */
clients = dictFetchValue(c->db->watched_keys,key);
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
listAddNodeTail(clients,c);
/* Add the new key to the list of keys watched by this client */
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
}

/* Unwatch all the keys watched by this client. To clean the EXEC dirty
* flag is up to the caller. */
void unwatchAllKeys(client *c) {
listIter li;
listNode *ln;

if (listLength(c->watched_keys) == 0) return;
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
list *clients;
watchedKey *wk;

/* Lookup the watched key -> clients list and remove the client
* from the list */
wk = listNodeValue(ln);
clients = dictFetchValue(wk->db->watched_keys, wk->key);
serverAssertWithInfo(c,NULL,clients != NULL);
listDelNode(clients,listSearchKey(clients,c));
/* Kill the entry at all if this was the only client */
if (listLength(clients) == 0)
dictDelete(wk->db->watched_keys, wk->key);
/* Remove this watched key from the client->watched list */
listDelNode(c->watched_keys,ln);
decrRefCount(wk->key);
zfree(wk);
}
}


通过源码,可以知道,redis针对key的watch生成了两种数据结构,一种是每一个client内有被监控的key列表watched_keys,另外一种是一个字典,字典的key是被监视的key名,value是一个包含有所有监控该key的client。



以上两个函数均是对两类数据结构对维护。在上一节中我们通过源码看到,EXEC时会判断客户端是否有CLIENT_DIRTY_CAS状态,那么问题来了,如果通过这个数据结构来维护CLIENT_DIRTY_CAS的标记和取消标记呢?我们继续看源码。

/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*
* Every time a key in the database is modified the function
* signalModifiedKey() is called.
*
* Every time a DB is flushed the function signalFlushDb() is called.
*----------------------------------------------------------------------------*/

void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
}

/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;

if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;

/* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);

c->flags |= CLIENT_DIRTY_CAS;
}
}


通过注释我们可以看到,每次一个key的value被修改的时候,会触发signalModifiedKey函数的执行,而在其调用的touchWatchedKey函数中会检索出监视该key的所有的客户端列表,并将该客户端的flags标记为CLIENT_DIRTY_CAS。

后续

虽然我们可以通过这几个命令来实现redis事务相关的内容,但是我们实际开发中通常会使用lua脚本来写,因为lua脚本的所有内容都是在一个事务里。本文就到这里,欢迎一起探讨。

欢迎关注我的微信公众号

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: