您的位置:首页 > 数据库 > Redis

结合redis设计与实现的redis源码学习-22-集群(cluster.c)

2017-12-06 23:51 716 查看
Redis集群是Redis提供的分布式数据库方案,集群通过分片(sharding)来进行数据共享,并提供复制和故障转移功能。

一、节点

一个Redis集群通常由多个节点组成,在刚开始的时候,每个节点都是独立的,他们都处于一个只包含自己的集群当中,我们必须将各个独立的节点连接起来,构成一个包含多个节点的集群。使用CLUDSTER MEET 命令可以连接各个节点。

1、启动节点

一个节点就是一个运行在集群模式下的Redis服务器,Redis服务器在启动时会根据cluster-enavled配置选项是否为yes来决定是否开启服务器的集群模式。

节点会继续使用所有在单机模式中使用的服务器组件。

2、集群数据结构

clusterNode结构保存了一个节点的当前状态,比如节点的创建时间,节点的名字,节点当前的配置纪元,节点的IP和Port等。

每个节点都会使用一个clusterNode结构来记录自己的状态,并为集群中的所有其他节点都创建一个相应的clusterNode结构,并以此来记录其他节点的状态:

typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. 节点创建的时间*/
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size 节点的名字,由40个十六进制字符组成*/
int flags;      /* CLUSTER_NODE_... 节点标识,使用各种不同的标识值记录节点的角色,以及节点目前的状态*/
uint64_t configEpoch; /* Last configEpoch observed for this node 当前的配置纪元,用于实现故障转移*/
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node 这个节点的槽*/
int numslots;   /* Number of slots handled by this node */
int numslaves;  /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
mstime_t ping_sent;      /* Unix time we sent latest ping */
mstime_t pong_received;  /* Unix time we received the pong */
mstime_t fail_time;      /* Unix time when FAIL flag was set */
mstime_t voted_time;     /* Last time we voted for a slave of this master */
mstime_t repl_offset_time;  /* Unix time we received offset for this node */
mstime_t orphaned_time;     /* Starting time of orphaned master condition */
long long repl_offset;      /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN];  /* Latest known IP address of this node */
int port;                   /* Latest known port of this node */
clusterLink *link;          /* TCP/IP link with this node 保存连接节点所需的有关信息
10baf
*/
list *fail_reports;         /* List of nodes signaling this as failing */
} clusterNode;

typedef struct clusterLink {
mstime_t ctime;             /* Link creation time 连接创建时间*/
int fd;                     /* TCP socket file descriptor tcp连接描述符*/
sds sndbuf;                 /* Packet send buffer 输出缓冲区*/
sds rcvbuf;                 /* Packet reception buffer 输入缓冲区*/
struct clusterNode *node;   /* Node related to this link if any, or NULL 与这个链接相关联的节点,没有就为NULL*/
} clusterLink;


另外每个节点都保存着一个clusterState结构,这个结构记录了在当前节点视角下,集群目前所处的状态,例如集群是在线还是下线,包含多少个节点,当前的配置纪元等:

typedef struct clusterState {
clusterNode *myself;  /* This node 指向当前节点的指针*/
uint64_t currentEpoch;//当前纪元,用于实现故障转移
int state;            /* CLUSTER_OK, CLUSTER_FAIL, ... */
int size;             /* Num of master nodes with at least one slot 集群中至少矗立着一个槽节点的数量*/
dict *nodes;          /* Hash table of name -> clusterNode structures 集群节点名单,字典的键为节点名称,值为对应的clusterNode结构*/
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
zskiplist *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count;    /* Number of votes received so far. */
int failover_auth_sent;     /* True if we already asked for votes. */
int failover_auth_rank;     /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason;   /* Why a slave is currently not able to failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end;            /* Manual failover time limit (ms unixtime).It is zero if there is no MF in progress. */
/* Manual failover state of master. */
clusterNode *mf_slave;      /* Slave performing the manual failover. */
/* Manual failover state of slave. */
long long mf_master_offset; /* Master offset the slave needs to start MF or zero if stil not received. */
int mf_can_start;           /* If non-zero signal that the manual failover can start requesting masters vote. */
/* The followign fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch;     /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
long long stats_bus_messages_sent;  /* Num of msg sent via cluster bus. */
long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/
} clusterState;


3、CLUSTER MEET命令的实现

通过向源节点发送CLLUSTER MEET命令,客户端可以让接收命令的节点将另一个目标节点添加到源节点当前所在的集群中:

源节点收到命令后会与目标节点进行握手,来确认彼此的存在,然后进行下一步:

-1、源节点会为目标节点创建一个clusterNode结构,并添加到自己的clusterState.nodes字典中;

-2、源节点根据命令给定的ip和port,向目标节点发送一条MEET消息;

-3、如果顺利,目标节点收到源节点发送的MEET消息,目标节点会为源节点创建一个clusterNode结构,并添加到自己的clusterState.nodes字典中;

-4、目标节点返回源节点一条PONG消息;

-5、如果顺利,源节点收到PONG消息,知道目标节点已经成功接收到了自己的MEET消息。

-6、源节点向目标节点返回一条PING消息。

-7、如果顺利,目标节点收到PING消息,知道源节点收到了PONG,握手完成。

之后,源节点会通过Gossip协议传播目标节点的信息给集群中的其他节点,让其他节点也和目标节点握手,经过一段时间之后,目标节点会被集群中的所有节点认识。

二、槽指派

Redis集群通过分片的方式来保存数据库中的键值对:几群的整个数据库被分为16384个槽,数据库中的每个键都属于其中一个,集群中的每个节点可以处理0个或最多16384个槽。

当数据库中的16384个槽都有节点在处理时,集群处于上线状态;相反地,如果数据库中有任何一个槽没有得到处理,那么集群处于下线状态。

通过向节点发送CLUSTER ADDSLOTS命令,可以将一个或者多个槽指派给节点负责:CLUSTER ADDSLOTS [slot …]

1、记录节点的槽指派信息

clusterNode结构的slots和numslot记录了节点负责处理那些槽。

slots是一个二进制位数据,长度为16384/8=2018个字节。

Redis以0为索引,16383为终止索引,对slots数组中的16384个位进行编号,并根据索引i上的二进制位来判断节点是否负责处理槽i:1true,0false;

numslot记录节点负责处理的槽的数量。

2、传播节点的槽指派信息

一个节点除了会将自己负责处理的槽记录在clusterNode结构得slots属性和numslots之外,还会将自己的slots数组通过消息发送给集群中的其他节点,告诉其他节点自己负责哪些槽。

当其他节点收到这个消息后,会在自己的nodes字典中找到对应的node结构,对结构中的slots数组即兴保存或者更新。

3、记录集群所有槽的指派信息

clusterState结构中的slots数组记录了集群中所有16384个槽的指派信息:clusterNode *slots[16384];

其中每个指针都是一个指向clusterNode结构的指针;

4、CLUSTER ADDSLOTS命令的实现

-1、遍历所有输入槽,检查它们是否都是未指派槽;如果有一个槽已经被指派了,返回错误;

-2、再次遍历所有输入槽,讲这些槽指派给当前节点,设置clusterState。slots[i]和clusterState.myself.slots;

当命令执行完毕周后,节点会通过发送消息告知集群中的其他节点,自己目前正在负责处理哪些槽;

三、在集群中执行命令

在对数据库中的16384个槽都进行了指派后,集群进入上线状态,这是客户端就可以向集群中的节点发送数据命令了。

当客户端向节点发送与数据库键有关的命令时,接收命令的节点会计算出命令要处理的数据库键属于哪个槽,并检查这个槽是否指派给了自己,如果指派给了自己,那么节点直接执行这个命令。如果没有指派给自己,那么节点会向客户端返回一个MOVED错误,指引客户端转向正确的节点,并再次发送之前想要执行的命令。

1、计算键属于哪个槽

使用CLUSTER KEYSLOT 命令可以查看一个给定键属于哪个槽。

使用CRC64(key)计算给定键属于哪个槽。

2、判断槽是否由当前节点处理

计算出槽的编号后,节点会检查自己在clusterState.slot数组中的项i,判断是否由自己处理:

如果slots[i]等于clusterState.myself,那么说明节点可以执行客户端发送的命令。

如果不等于,那么节点会根据slot[i]指向的clusterNode结构所记录的节点ip和port,向客户端返回MOVED错误。

3、MOVED错误

格式为:MOVED :

当客户端收到节点返回的MOVED错误时,会根据提供的ip和port转向至负责处理该槽的节点。一般情况下集群客户端会与集群中的多个节点创建套接字连接,节点转向其实就是换个套接字发送命令,若尚未建立连接,那么会根据MOVED错误提供的ip和port连接节点,再进行转向。

4、节点数据库的实现

节点只能使用0号数据库。

节点除了将键值对保存在数据库里面之外,节点还会用clusterState结构中的slots_to_keys跳跃表来保存槽和键之间的关系:每个节点的分值都是一个槽号,每个节点的成员都是一个数据库键;通过在跳跃表中记录各个数据库键所属的槽,节点可以很方便的对属于某个或某些槽的所有数据库键进行批量操作。

四、重新分片

Redis集群的重新分片操作可以将任意数量已经指派给某个节点的槽改为指派给另一个节点,并且相关槽所属的键值对也会从源节点被移动到目标节点。

Redis集群的重新分片操作是由Redis的集群管理软件redis-trib负责执行的,Redis提供了进行重新分片所需的所有命令,而redis-trib则通过向源节点和目标节点发送命令来进行重新分片操作。

对集群单个槽进行重新分片的步骤:

-1、redis-trib对目标节点发送CLUSTER SETSLOT IMPORTING 命令,让目标节点准备好从源节点导入属于槽的键值对;

-2、redis-trib队员节点发送CLUSTER SETSLOT MIGRATING 命令,让源节点准备好将属于槽slot的键值对迁移至目标节点;

-3、redis-trib向源节点发送CLUSTER GETKEYSINSLOT 命令,获得最多count个属于槽的键值对键名;

-4、对每个键名,redis-trib都向源节点发送一个MIGRATE 0 命令,将被选中的键原子地从源节点迁移到目标节点;

-5、重复执行步骤3、4,指导源节点保存的所有属于槽的键值对都被迁移到目标节点为止;

-6、redis-trib向集群中的任意一个节点发送CLUSTER SETSLOT NODE 命令,将槽指派给目标节点,这一指派信息会通过消息发送至整个集群,最终集群中的所有节点都会知道槽已经指派给了目标节点。

五、ASK错误

在进行重新分片期间,源节点向目标节点迁移一个槽的过程中,可能会出现属于被迁移槽的一部分键值对保存在源节点中,而另一部分则保存在目标节点中。

当客户端向源节点发送一个与数据库键有关的命令,并且命令要处理的数据库键恰好就属于正在被迁移的槽时:源节点会先在自己的数据库里查找指定的键,找到则执行,否则向客户端返回一个ASK错误,指引客户端转向正在导入槽的目标节点,再次发送之前的命令。

1、CLUSTER SETSLOT IMPORTING命令的实现

clusterState结构的importing_slots_from数组记录了当前节点正在从其他节点导入的槽,如果importing_slots_from[i]不为NULL,而是指向一个clusterNode结构,那么表示当前节点正在从clusterNode所代表的节点导入槽i。

2、CLUSTER SETSLOT MIGRATING 命令的实现

clusterState结构的migrating_slots_to数组记录了当前节点正在迁移至其他节点的槽。如果migrating_slots_to[i]不为NULL,而是指向一个clusterNode,那么表示当前节点正在将槽i迁移至clusterNode所代表的节点。

3、ASKING命令

如果一个节点在迁移槽期间收到了关于键key的命令请求,key属于这个槽,那么客户端会先在自己的数据库中查找key,如果没有找到,通过检查自己的clusterState.migrating_slots_to[i],会发现自己正在将槽迁移至其他节点,于是向客户端返回ASK 错误,当客户端接收到这个ip和port后,会转向至目标节点,然后发送一个ASKING命令,之后再重新发送原本想要执行的命令。

ASKING命令主要用来打开REDIS_ASKING标识,这是一个一次性标识,当执行了一次命令后,这个标识就会被移除。

六、复制与故障转移

Redis集群中的节点分为主节点和从节点,其中主节点用于处理槽,从节点用于复制某个主节点,并在被复制的主节点下线时,代替下线主节点继续处理命令请求。

在其中一个主节点下线时,集群中仍然在运作的主节点将在节点的从节点中选出一个节点作为新的主节点,下线主节点的从节点开始成为这个新主节点的从属,如果旧的主节点重新上线,它会成为新主节点的从属。

1、设置从节点

向一个节点发送命令 CLUSTER REPLICATE 可以让接收命令的节点成为指定节点的从节点,并开始对主节点进行复制。

-1、在自己的nodes字典中找到id对应的节点node结构,struct clusterNode *slaveof会指向主节点。

-2、修改自己在myself.flags中的属性,关闭原本的REDIS_NODE_MASTER标识,打开REDIS_NODE_SLAVE标识,标识这个节点已经由原来的主节点变成了从节点。

一个节点成为从节点,并开始复制某个主节点这一信息会通过消息发送给集群中的其他节点,最终集群中的所有节点都会知道某个从节点正在复制某个主节点。

集群中的所有节点都会在代表主节点的clusterNode结构的slaves属性和numslaves属性中记录正在复制这个主节点的从节点名单。

2、故障检测

集群中每个节点都会定期地向集群中的其他节点发送PING消息,以此来检测对方是否在线,如果接收PING消息的界定啊没有在规定时间内,向发送PING消息的界定啊返回PONG消息,那么发送PING消息的节点就会将接收PING消息的节点标记为疑似下线:REDIS_NODE_PFAIL标识;

集群中的各个节点会通过互相发送消息的方式来交换集群中各个节点的状态信息。

当一个主节点A通过消息得知主节点B认为主节点C进入了疑似下线状态时,主节点A会在自己的nodes字典中找到C所对应的node结构,并将主节点B的下线报告添加到clusterNode结构的fail_reports链表中。每个下线报告由一个clusterNodeFailReport结构表示:

/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
struct clusterNode *node;  /* Node reporting the failure condition. 报告目标节点已经下线的节点*/
mstime_t time;             /* Time of the last report from this node. 最后一次从node节点收到下线报告的时间*/
} clusterNodeFailReport;


如果在一个集群中,半数以上负责处理槽的主节点都将某个主节点x报告为疑似下线,那么这个主节点x将被标记为已下线FAIL,将主节点标记为已下线的节点会向集群广播一条关于主节点x的FAIL消息,收到这条消息的节点都会立即将主节点x标位下线。

3、故障转移

当一个从节点发现自己正在复制的主节点进入了已下线状态时,从节点将开始对下线主节点进行故障转移:

-1、复制下线主节点的从节点中,将会有一个节点被选出;

-2、被选中的从节点会执行slaveof no one命令,成为新的主节点;

-3、新的主节点会撤销所有对已下线主节点的槽指派,并将这些槽全部指派给自己;

-4、新的主节点向集群广播一条PONG消息,这条PONG消息可以让集群中的其他节点立即知道这个节点已经由从节点变成了主节点,并且这个主节点已经接管了原本由已下线节点负责处理的槽;

-5、新的主节点开始接收和自己负责槽有关的命令请求。

4、选举新的主节点

-1、集群的配置纪元是一个自增计数器,从0开始;

-2、当某个节点开始一次故障转移时,配置纪元的值会+1;

-3、对于每个配置纪元,集群里每个负责处理槽的主节点都有一次投票的机会,而第一个向主节点要求投票的从节点将获得主节点的投票;

-4、当从节点发现自己正在复制的主节点进入已下线状态时,会广播一条CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST消息,要求所有收到这条消息,并且具有投票权的主节点向这个从节点投票;

-5、如果一个主节点具有投票权,并且这个主节点还没有投票,它会向要求投票的从节点返回一条CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,表示支持;

-6、每个参与选举的从节点都会接受这条消息,并根据自己收到了多少条这个消息来统计自己的支持数;

-7、如果票数过半,这个从节点成为新的主节点。

-8、因为在每个配置纪元里,每个具有投票权的主节点只能投一次票,所以如果有N个主节点进行投票,过半的支持数只有一个。

-9、如果一个配置纪元中没有从节点收集到足够的票,那么集群进入一个新的配置纪元,再次选举。

七、消息

集群中的各个节点通过发送和接收消息来进行通信。节点发送的消息主要有五种:

-1、MEET消息:接受者加入到发送者当前所处的集群中;

-2、PING消息:集群中的每个节点每一秒钟就会从已知节点中随机选出5个节点,发送PING消息,检测对方是否在线。如果某个节点在自己的节点列表中timeout时间过半,也会发送PING消息;

-3、PONG消息:当接收者受到了MEET消息或者PING消息时,回回复PONG消息。一个节点也可以通过广播PONG消息来让集群中的其他节点立即刷新关于这个节点的认识;

-4、FAIL消息:当一个主节点A判断另一个主节点B已经进入FAIL状态时,节点A会向集群广播一条关于B的FAIL消息,收到的节点会标记B已下线;

-5、PUBLISH消息:当节点收到一个PUBLISH命令时,节点会执行这个命令,并向集群广播一条PUBLISH消息,所有接收这个消息的节点会执行相同的PUBLISH命令。

1、消息头

每个消息头都是由一个cluster.h/clusterMsg结构表示的:

typedef struct {
char sig[4];        /* Siganture "RCmb" (Redis Cluster message bus). */
uint32_t totlen;    /* Total length of this message 消息长度*/
uint16_t ver;       /* Protocol version, currently set to 0. */
uint16_t notused0;  /* 2 bytes not used. */
uint16_t type;      /* Message type 消息类型*/
uint16_t count;     /* Only used for some kind of messages. 正文包含的节点信息数量*/
uint64_t currentEpoch;  /* The epoch accordingly to the sending node. 发送者所处的配置纪元*/
uint64_t configEpoch;   /* The config epoch if it's a master, or the last epoch advertised by its master if it is a slave. 主节点的配置纪元*/
uint64_t offset;    /* Master replication offset if node is a master or processed replication offset if node is a slave. */
char sender[CLUSTER_NAMELEN]; /* Name of the sender node 发送者的ID*/
unsigned char myslots[CLUSTER_SLOTS/8]发送者目前的槽指派信息;
char slaveof[CLUSTER_NAMELEN];//主节点的名字
char notused1[32];  /* 32 bytes reserved for future usage. */
uint16_t port;      /* Sender TCP base port 发送者端口号*/
uint16_t flags;     /* Sender node flags 发送者的标识*/
unsigned char state; /* Cluster state from the POV of the sender 发送者的集群状态*/
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data;//消息正文
} clusterMsg;


clusterMsg.data属性指向联合cluster.h/clusterMsgData,这个联合就是消息正文:

union clusterMsgData {
/* PING, MEET and PONG 消息正文*/
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
} ping;

/* FAIL */
struct {
clusterMsgDataFail about;
} fail;

/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;

/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
};


2、MEET,PING,PONG消息的实现

节点通过消息头type属性判断消息类型。

每次发送这三个消息时,发送者都从自己的已知节点列表中随机选取两个节点,并将这两个节点的信息保存到两个clusterMsgDataGossip结构中。

/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
* address for all the next messages. */
typedef struct {
char nodename[CLUSTER_NAMELEN];//节点名称
uint32_t ping_sent;//最后一次向该节点发送PING消息的时间戳
uint32_t pong_received;//最后一次从该节点收到PONG消息的时间戳
char ip[NET_IP_STR_LEN];  /* IP address last time it was seen 节点ip*/
uint16_t port;              /* port last time it was seen 节点端口*/
uint16_t flags;             /* node->flags copy 复制标识*/
uint16_t notused1;          /* Some room for future improvements. */
uint32_t notused2;
} clusterMsgDataGossip;


当接收者收到消息真时,会根据这两个结构判断:如果被选中节点不在接收者的节点列表,那么说明接收者是第一次接触到被选中节点,接受者根据节点信息进行握手,否则根据信息,更新clusterNode结构。

3、FAIL消息的实现

当集群里的主节点A将主节点B标记为已下线时,A将向集群广播一条关于主节点B的FAIL消息,接收到FAIL消息的节点都会将主节点B标记为已下线。

FAIL消息的正文只包含nodename属性,记录了已下线节点的名字;

4、PUBLISH消息的实现

当客户端向集群中某个节点发送PUBLISH 命令的时候,接收到PUBLISH命令的节点不仅会向channel频道发送消息,还会向集群广播一条PUBLISH消息,所有接收到这条PUBLISH消息的节点都会向channel频道发送message消息。

消息正文由clusterMsgDataPublish结构表示:

typedef struct {
uint32_t channel_len;
uint32_t message_len;
/* We can't reclare bulk_data as bulk_data[] since this structure is
* nested. The 8 bytes are removed from the count during the message
* length computation. 8字节只为了消息对齐,实际长度由保存的内容决定*/
unsigned char bulk_data[8];
} clusterMsgDataPublish;


cluster.c

待续
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐