Redis源码学习简记(十)t_list原理与个人理解
2018-04-21 11:07
423 查看
对于链表,我看的这个版本的redis只使用quicklist作为存储方式。这种数据结构结合了ziplist的存储空间的高效与双向链表的灵活的特性。在看api中,对于这个双向链表的使用,看起来其实更像队列的实现,基本上都是围绕push和pop的操作。那么知道了链表的实现的数据结构,那么就来看看redis中的api是怎么写的。
链表的push操作
/* The function pushes an element to the specified list object 'subject', * at head or tail position as specified by 'where'. * * There is no need for the caller to increment the refcount of 'value' as * the function takes care of it if needed. */ void listTypePush(robj *subject, robj *value, int where) { //链表类型的插入,需保证编码方式为快链。 if (subject->encoding == OBJ_ENCODING_QUICKLIST) { int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL; //头插还是尾插法 value = getDecodedObject(value); //解码 size_t len = sdslen(value->ptr); quicklistPush(subject->ptr, value->ptr, len, pos); //调用底层api插入元素 decrRefCount(value); //插入元素减少引用 } else { serverPanic("Unknown list encoding"); } }pop操作
void *listPopSaver(unsigned char *data, unsigned int sz) { return createStringObject((char*)data,sz); } robj *listTypePop(robj *subject, int where) { //弹出元素,根据where定位 long long vlong; robj *value = NULL; //用于弹出后存储元素。 int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL; if (subject->encoding == OBJ_ENCODING_QUICKLIST) { //验证编码类型 if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value, NULL, &vlong, listPopSaver)) { /* int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data, unsigned int *sz, long long *sval, void *(*saver)(unsigned char *data, unsigned int sz)); listPopSaver为一个函数指针。 拿到元素之后,若为字符串类型,则通过函数指针调用listPopSaver创建robj对象 */ if (!value)//若value为空,证明拿到的是整型,那么值存在vlong中 value = createStringObjectFromLongLong(vlong); } } else { serverPanic("Unknown list encoding"); } return value;//返回上面加工好的对象。 }返回list的元素个数
unsigned long listTypeLength(const robj *subject) { //返回元素个数 if (subject->encoding == OBJ_ENCODING_QUICKLIST) { return quicklistCount(subject->ptr); } else { serverPanic("Unknown list encoding"); } }
list类型的迭代器的构造
/* Initialize an iterator at the specified index. */ listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) { //初始化listType的迭代器 /* typedef struct { robj *subject; 迭代器的对象 unsigned char encoding;编码模式 unsigned char direction; 方向 quicklistIter *iter;原生的迭代器 } listTypeIterator; 跟hashTypeIterator 一个套路,将底层的迭代器进行封装。 quicklist的迭代器,是遍历所有的元素的,index可以为负数,说明是反向迭代器 */ listTypeIterator *li = zmalloc(sizeof(listTypeIterator)); li->subject = subject; li->encoding = subject->encoding; li->direction = direction; li->iter = NULL; //初始化 /* LIST_HEAD means start at TAIL and move *towards* head. * LIST_TAIL means start at HEAD and move *towards tail. */ int iter_direction = direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD; if (li->encoding == OBJ_ENCODING_QUICKLIST) { li->iter = quicklistGetIteratorAtIdx(li->subject->ptr, iter_direction, index); //这里会分配iter的空间。 //根据index拿到元素指针。 } else { serverPanic("Unknown list encoding"); } return li; }释放迭代器内存
/* Clean up the iterator. */ void listTypeReleaseIterator(listTypeIterator *li) { //释放迭代器内存。 zfree(li->iter); zfree(li); }迭代器的next操作
/* Stores pointer to current the entry in the provided entry structure * and advances the position of the iterator. Returns 1 when the current * entry is in fact an entry, 0 otherwise. */ int listTypeNext(listTypeIterator *li, listTypeEntry *entry) { /* Protect from converting when iterating */ /* Structure for an entry while iterating over a list. typedef struct { listTypeIterator *li; quicklistEntry entry; Entry in quicklist } listTypeEntry; 其entry包括了迭代器于实体元素 */ serverAssert(li->subject->encoding == li->encoding); entry->li = li; if (li->encoding == OBJ_ENCODING_QUICKLIST) { return quicklistNext(li->iter, &entry->entry); } else { serverPanic("Unknown list encoding"); } return 0; }获取链表entry中的数据
/* Return entry or NULL at the current position of the iterator. */ robj *listTypeGet(listTypeEntry *entry) { robj *value = NULL; //通过entry获取真正的数据 整型或者字符串 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) { if (entry->entry.value) { value = createStringObject((char *)entry->entry.value, entry->entry.sz); } else { value = createStringObjectFromLongLong(entry->entry.longval); } } else { serverPanic("Unknown list encoding"); } return value; }链表中的插入函数
void listTypeInsert(listTypeEntry *entry, robj *value, int where) { //提供头插法与尾插法,robj会维护其引用计数 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) { value = getDecodedObject(value); sds str = value->ptr; size_t len = sdslen(str); if (where == LIST_TAIL) { quicklistInsertAfter((quicklist *)entry->entry.quicklist, &entry->entry, str, len); } else if (where == LIST_HEAD) { quicklistInsertBefore((quicklist *)entry->entry.quicklist, &entry->entry, str, len); } decrRefCount(value); } else { serverPanic("Unknown list encoding"); } }还有一些比较简单的判等,删除与转化的函数
/* Compare the given object with the entry at the current position. */ int listTypeEqual(listTypeEntry *entry, robj *o) { if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) { serverAssertWithInfo(NULL,o,sdsEncodedObject(o)); return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr)); } else { serverPanic("Unknown list encoding"); } } /* Delete the element pointed to. */ void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) { if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) { //删除制定元素,并且使得iter根据方向指向删除元素的前一个元素或者后一个元素。 quicklistDelEntry(iter->iter, &entry->entry); } else { serverPanic("Unknown list encoding"); } } /* Create a quicklist from a single ziplist */ void listTypeConvert(robj *subject, int enc) { serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST); serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST); //从ziplist转化为quicklist if (enc == OBJ_ENCODING_QUICKLIST) { size_t zlen = server.list_max_ziplist_size; int depth = server.list_compress_depth; subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr); subject->encoding = OBJ_ENCODING_QUICKLIST; } else { serverPanic("Unsupported list conversion"); } }一些对于t_list的一些api大概就上面的这些。而这些api是通过client的命令进行调用的,不同的命令会调用上面一个或多个的api。那么下面就来分析对于client来说,提供了哪些命令。
首先第一个为push操作
LPUSH KEY_NAME VALUE1.. VALUEN
RPUSH KEY_NAME VALUE1..VALUEN
void pushGenericCommand(client *c, int where) { //插入命令 int j, pushed = 0; robj *lobj = lookupKeyWrite(c->db,c->argv[1]); //根据argv[1] 以写的方式获取 robj数据 if (lobj && lobj->type != OBJ_LIST) {//检查类型保证类型正确 addReply(c,shared.wrongtypeerr); return; } //分两种情况,若查找成功则进行插入操作,若不存在则创建一个快链对象 for (j = 2; j < c->argc; j++) { if (!lobj) { //若不存在则创建一个新的quicklist链表 lobj = createQuicklistObject(); quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size, server.list_compress_depth); //设置快链配置 dbAdd(c->db,c->argv[1],lobj); //添加该对象结构体到数据库中,以argv[1] 作为key } listTypePush(lobj,c->argv[j],where); //根据where向快链中插入数据集 pushed++;//记录插入的数量 } addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0)); //返回当前list元素个数。 if (pushed) { //前插入还是后插入 char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; signalModifiedKey(c->db,c->argv[1]); //数据库被修改则调用该函数发送修改信号 notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); //发送时间通知 } server.dirty += pushed; //增加脏数据。 } void lpushCommand(client *c) { pushGenericCommand(c,LIST_HEAD); } void rpushCommand(client *c) { pushGenericCommand(c,LIST_TAIL); }pushx命令 与前者区别在与该命令保证了,链表必然存在,不会自动创建链表对象。
RPUSHX KEY_NAME VALUE1..VALUEN
LPUSHX KEY_NAME VALUE1.. VALUEN
void pushxGenericCommand(client *c, int where) { int j, pushed = 0; robj *subject; //该操作保证key存在。 //以写操作获取robj对象 if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,subject,OBJ_LIST)) return; //保证subject必定存在 for (j = 2; j < c->argc; j++) { //存在则将所有的元素一一插入 listTypePush(subject,c->argv[j],where); pushed++; } addReplyLongLong(c,listTypeLength(subject)); //返回添加后元素个数 if (pushed) { //消息与事件的通知 char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); } server.dirty += pushed; //增加脏数据个数 } void lpushxCommand(client *c) { pushxGenericCommand(c,LIST_HEAD); } void rpushxCommand(client *c) { pushxGenericCommand(c,LIST_TAIL); }
插入命令 Redis Linsert 命令用于在列表的元素前或者后插入元素。当指定元素不存在于列表中时,不执行任何操作。
命令格式 LINSERT key BEFORE|AFTER pivot value结合命令来看,其实现就很容易看懂了。
void linsertCommand(client *c) { //insert的底层实现 //利用迭代器,找到元素在前面或者后面插入 int where; robj *subject; listTypeIterator *iter; listTypeEntry entry; int inserted = 0; //比较where 忽略大小写 判别是在index哪里插入 if (strcasecmp(c->argv[2]->ptr,"after") == 0) { where = LIST_TAIL; } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) { where = LIST_HEAD; } else { addReply(c,shared.syntaxerr); return; } //保证元素必定存在 if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,subject,OBJ_LIST)) return; /* Seek pivot from head to tail */ iter = listTypeInitIterator(subject,0,LIST_TAIL); //初始化迭代器,方向从头到尾 while (listTypeNext(iter,&entry)) { if (listTypeEqual(&entry,c->argv[3])) { //找到元素后,则进行插入操作 listTypeInsert(&entry,c->argv[4],where); inserted = 1; break; } } listTypeReleaseIterator(iter); //释放迭代器空间 if (inserted) { //插入成功那么发送修改信号与事件通知 signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,"linsert", c->argv[1],c->db->id); server.dirty++; //增加脏数据个数 } else { /* Notify client of a failed insert */ addReply(c,shared.cnegone); //否则通知client插入失败 return; } addReplyLongLong(c,listTypeLength(subject)); }接下来的操作都是大同小异的,感觉只是不断的重复。另外还有一个比较特殊的东西就是list 的阻塞。阻塞与普通的pop的区别在于当执行的时候,当元素或链表不存在,那么会将请求命令的client阻塞掉。直到另外的客户端插入该元素或者阻塞超时。原来的客户端才会从阻塞的状态中恢复过来。这些api中,重要的函数就是调用blockForKeys进行阻塞。包括了下面的几个操作。
移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
BRPOPLPUSH source destination timeout
从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
ttimeout就是设定的时间。我们抽一个bpop看一下其实现。
/* Blocking RPOP/LPOP */ void blockingPopGenericCommand(client *c, int where) { robj *o; mstime_t timeout; int j; if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) != C_OK) return; //获取时间,用于计算timeout for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]); if (o != NULL) { if (o->type != OBJ_LIST) {//检查类型 addReply(c,shared.wrongtypeerr); return; } else { if (listTypeLength(o) != 0) { //若元素存在,那么做法跟前面的相似,弹出元素后直接return /* Non empty list, this is like a non normal [LR]POP. */ char *event = (where == LIST_HEAD) ? "lpop" : "rpop"; robj *value = listTypePop(o,where); serverAssert(value != NULL); addReplyMultiBulkLen(c,2); addReplyBulk(c,c->argv[j]); addReplyBulk(c,value); decrRefCount(value); notifyKeyspaceEvent(NOTIFY_LIST,event, c->argv[j],c->db->id); if (listTypeLength(o) == 0) { dbDelete(c->db,c->argv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del", c->argv[j],c->db->id); } signalModifiedKey(c->db,c->argv[j]); server.dirty++; /* Replicate it as an [LR]POP instead of B[LR]POP. */ rewriteClientCommandVector(c,2, (where == LIST_HEAD) ? shared.lpop : shared.rpop, c->argv[j]); return; } } } } /* If we are inside a MULTI/EXEC and the list is empty the only thing * we can do is treating it as a timeout (even with timeout 0). */ if (c->flags & CLIENT_MULTI) { addReply(c,shared.nullmultibulk); return; } //若不存在元素调用blockForKeys进行阻塞。 /* If the list is empty or the key does not exists we must block */ blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL); } void blpopCommand(client *c) { blockingPopGenericCommand(c,LIST_HEAD); } void brpopCommand(client *c) { blockingPopGenericCommand(c,LIST_TAIL); }接下来的关键的blockForKeys实现在block.c文件中。redis对于block其实是维护了两个数据结构。一个为key链表
/* Set a client in blocking mode for the specified key (list or stream), with * the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM * depending on the kind of operation we are waiting for an empty key in * order to awake the client. The client is blocked for all the 'numkeys' * keys as in the 'keys' argument. When we block for stream keys, we also * provide an array of streamID structures: clients will be unblocked only * when items with an ID greater or equal to the specified one is appended * to the stream. */ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) { dictEntry *de; //dict用于维护client与key对应关系 list *l; //普通的双向队列 即adlist 用于维护阻塞的client对列 int j; c->bpop.timeout = timeout; c->bpop.target = target; if (target != NULL) incrRefCount(target); for (j = 0; j < numkeys; j++) { //下面一大段的代码只是将元素添加到上面两个维护的数据结构中。 /* The value associated with the key name in the bpop.keys dictionary * is NULL for lists, or the stream ID for streams. */ void *key_data = NULL; if (btype == BLOCKED_STREAM) { key_data = zmalloc(sizeof(streamID)); memcpy(key_data,ids+j,sizeof(streamID)); } /* If the key already exists in the dictionary ignore it. */ if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) { zfree(key_data); continue; } incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); serverAssertWithInfo(c,keys[j],retval == DICT_OK); } else { l = dictGetVal(de); } listAddNodeTail(l,c); } blockClient(c,btype); }初始化了两个阻塞的数据结构后,就可以对client进行阻塞操作了。阻塞了当然还会有unblock的操作。
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { //设置block操作 //这后面unblock就设计到client与service的通信的东西,暂时没看先不研究。 //等后面研究到了再回过头来看吧。 c->flags |= CLIENT_BLOCKED; c->btype = btype; server.blocked_clients++; server.blocked_clients_by_type[btype]++; }对于t_list的源码分析就到这里。中间花的时间还是挺长的。主要是去了一趟杭州网易c++实习生的面试。结果还在二面挂了。真的是难过。来来去去,总觉得自己掌握的东西还是太少了,网络与数据库都几乎等于0。哎,路漫漫其修远兮,勉之!
BRPOPLPUSH source destination timeout 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 |
相关文章推荐
- Redis源码学习-NoSql复杂类型对象的hash管理(二:List)
- redis源码分析(四)、redis命令学习总结—链表List
- 3.5学习内容 宽带测速原理,RESTful架构,rpc个人理解
- redis源码阅读理解,及相关语言细节---adlist.c
- redis源码分析(八)、redis数据结构之压缩ziplist--------ziplist.c ziplist.h学习笔记
- 结合redis设计与实现的redis源码学习-8.3-t_list.c(列表键)
- redis源码分析(9)redis源码链表学习总结 adlist.h adlist.c
- 结合redis设计与实现的redis源码学习-5-skiplist(跳跃表)
- 源码分析redis的有序集合,学习skiplist跳跃表数据结构
- 学习common-upload源码,理解上传原理
- Redis学习记录之命令List(十一)
- ibatis源码学习(三)参数和结果的映射原理
- 深入理解View知识系列一- setContentView和LayoutInflater源码原理分析
- 深度理解Android InstantRun原理以及源码分析
- 深度学习U-net个人理解
- git学习笔记 -- day01 原理、安装、工作流程、三种装态、设置个人信息
- Elasticsearch学习随笔(一)--原理理解与5.0核心插件部署过程
- 向优秀代码学习:Redis 源码概览
- spring学习总结 - ioc原理及ID理解
- Redis的个人学习总结