您的位置:首页 > 数据库 > Redis

redis的发布订阅模式

2016-07-03 11:22 627 查看
概要redis的每个server实例都维护着一个保存服务器状态的redisServer结构struct redisServer{ /* Pubsub */ // 字典,键为频道,值为链表 // 链表中保存了所有订阅某个频道的客户端 // 新客户端总是被添加到链表的表尾 dict *pubsub_channels; /* Map channels to list of subscribed clients */ // 这个链表记录了客户端订阅的所有模式的名字 list *pubsub_patterns; /* A list of pubsub_patterns */};pubsub_channels记录了所有客户端订阅的频道的信息。数据类型是dict,dict中的key代表了channel,而val存储了关注频道的所有clients。redis的发布订阅模式


订阅者通过sub命令订阅频道,server使用pub命令把消息推送到符合条件的pubsub_channels中。

内部数据结构pubsub_channels是一个字典结构,字典内部使用hash表存储和索引数据。实现字典的可选数据结构 hash表:简单但是不稳定的基于数组的冲突解决法;简单且平均效率稳定的链式地址的冲突解决法; 字典树:使用树结构。redis采用hash的链式地址法作为实现,好处是直观、简单、可期待平均时间复杂度较小且平稳。typedef struct dict { // 类型特定函数 dictType *type; // 私有数据 void *privdata; // 哈希表 dictht ht[2]; // rehash 索引 // 当 rehash 不在进行时,值为 -1 int rehashidx; /* rehashing not in progress if rehashidx == -1 */ // 目前正在运行的安全迭代器的数量 int iterators; /* number of iterators currently running */} dict;dict使用两个hash表进行索引,当进行rehash的时候使用ht[1]的hash表,其他时候都使用hd[0]的hash表。使用的hash表的定义如下typedef struct dictht { // 哈希表数组 dictEntry **table; // 哈希表大小 unsigned long size; // 哈希表大小掩码,用于计算索引值 // 总是等于 size - 1 unsigned long sizemask; // 该哈希表已有节点的数量 unsigned long used;} dictht;table是一个存储dictEntry*指针的数组,table中的每一项都是一个指向DictEntry结构的指针,同时每一项也都带有一个指向下一项的指针。可以看出这是一个使用链地址法解决冲突的hash结构。dictEntry的定义,包含key、value、next。/* * 哈希表节点 */typedef struct dictEntry { // 键 void *key; // 值 union { void *val; uint64_t u64; int64_t s64; } v; // 指向下个哈希表节点,形成链表 struct dictEntry *next;} dictEntry;客户端订阅频道就是字典中插入新元素的过程 // 关联示意图 // { // 频道名 订阅频道的客户端 // 'channel-a' : [c1, c2, c3], // 'channel-b' : [c5, c2, c1], // 'channel-c' : [c10, c2, c1] // } /* Add the client to the channel -> list of clients hash table */ // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表 // 如果 channel 不存在于字典,那么添加进去 de = dictFind(server.pubsub_channels,channel); if (de == NULL) { clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { clients = dictGetVal(de); } // before: // 'channel' : [c1, c2] // after: // 'channel' : [c1, c2, c3] // 将客户端添加到链表的末尾 listAddNodeTail(clients,c);1.查询server是否包含指定频道2.如果频道存在就获取频道指向的list地址;如果频道不存在,就创建频道和list3.使用步骤2的list,将客户端添加到list的末尾。完成这三步以后,server维护的发布订阅频道就新增了一个频道和关注的客户端,server发布时检测发布订阅字典,获取订阅客户端并依次发送。server发布消息到channel/* Send to clients listening for that channel */ // 取出包含所有订阅频道 channel 的客户端的链表 // 并将消息发送给它们 de = dictFind(server.pubsub_channels,channel); if (de) { list *list = dictGetVal(de); listNode *ln; listIter li; // 遍历客户端链表,将 message 发送给它们 listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { redisClient *c = ln->value; // 回复客户端。 // 示例: // 1) "message" // 2) "xxx" // 3) "hello" addReply(c,shared.mbulkhdr[3]); // "message" 字符串 addReply(c,shared.messagebulk); // 消息的来源频道 addReplyBulk(c,channel); // 消息内容 addReplyBulk(c,message); // 接收客户端计数 receivers++; } }结合订阅者订阅的过程,发布过程就是一个查找订阅者并轮询发送消息给订阅者的过程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: