您的位置:首页 > 数据库 > Redis

Redis源码学习简记(十)t_list原理与个人理解

2018-04-21 11:07 423 查看

       对于链表,我看的这个版本的redis只使用quicklist作为存储方式。这种数据结构结合了ziplist的存储空间的高效与双向链表的灵活的特性。在看api中,对于这个双向链表的使用,看起来其实更像队列的实现,基本上都是围绕push和pop的操作。那么知道了链表的实现的数据结构,那么就来看看redis中的api是怎么写的。

链表的push操作

/* The function pushes an element to the specified list object 'subject',
* at head or tail position as specified by 'where'.
*
* There is no need for the caller to increment the refcount of 'value' as
* the function takes care of it if needed. */
void listTypePush(robj *subject, robj *value, int where) {
//链表类型的插入,需保证编码方式为快链。
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
//头插还是尾插法
value = getDecodedObject(value);
//解码
size_t len = sdslen(value->ptr);
quicklistPush(subject->ptr, value->ptr, len, pos);
//调用底层api插入元素
decrRefCount(value);
//插入元素减少引用
} else {
serverPanic("Unknown list encoding");
}
}
pop操作
void *listPopSaver(unsigned char *data, unsigned int sz) {
return createStringObject((char*)data,sz);
}

robj *listTypePop(robj *subject, int where) {
//弹出元素,根据where定位
long long vlong;
robj *value = NULL;
//用于弹出后存储元素。

int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
//验证编码类型
if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
NULL, &vlong, listPopSaver)) {
/*
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *sval,
void *(*saver)(unsigned char *data, unsigned int sz));
listPopSaver为一个函数指针。
拿到元素之后,若为字符串类型,则通过函数指针调用listPopSaver创建robj对象
*/
if (!value)//若value为空,证明拿到的是整型,那么值存在vlong中
value = createStringObjectFromLongLong(vlong);
}
} else {
serverPanic("Unknown list encoding");
}
return value;//返回上面加工好的对象。
}
返回list的元素个数
unsigned long listTypeLength(const robj *subject) {
//返回元素个数
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
return quicklistCount(subject->ptr);
} else {
serverPanic("Unknown list encoding");
}
}

list类型的迭代器的构造
/* Initialize an iterator at the specified index. */
listTypeIterator *listTypeInitIterator(robj *subject, long index,
unsigned char direction) {
//初始化listType的迭代器
/*

typedef struct {
robj *subject; 迭代器的对象
unsigned char encoding;编码模式
unsigned char direction; 方向
quicklistIter *iter;原生的迭代器
} listTypeIterator;
跟hashTypeIterator 一个套路,将底层的迭代器进行封装。
quicklist的迭代器,是遍历所有的元素的,index可以为负数,说明是反向迭代器
*/
listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
li->subject = subject;
li->encoding = subject->encoding;
li->direction = direction;
li->iter = NULL;
//初始化
/* LIST_HEAD means start at TAIL and move *towards* head.
* LIST_TAIL means start at HEAD and move *towards tail. */
int iter_direction =
direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
if (li->encoding == OBJ_ENCODING_QUICKLIST) {
li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
iter_direction, index);
//这里会分配iter的空间。
//根据index拿到元素指针。

} else {
serverPanic("Unknown list encoding");
}
return li;
}
释放迭代器内存
/* Clean up the iterator. */
void listTypeReleaseIterator(listTypeIterator *li) {
//释放迭代器内存。
zfree(li->iter);
zfree(li);
}
迭代器的next操作
/* Stores pointer to current the entry in the provided entry structure
* and advances the position of the iterator. Returns 1 when the current
* entry is in fact an entry, 0 otherwise. */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
/* Protect from converting when iterating */
/*
Structure for an entry while iterating over a list.
typedef struct {
listTypeIterator *li;
quicklistEntry entry;  Entry in quicklist
} listTypeEntry;
其entry包括了迭代器于实体元素
*/
serverAssert(li->subject->encoding == li->encoding);

entry->li = li;
if (li->encoding == OBJ_ENCODING_QUICKLIST) {
return quicklistNext(li->iter, &entry->entry);
} else {
serverPanic("Unknown list encoding");
}
return 0;
}
获取链表entry中的数据
/* Return entry or NULL at the current position of the iterator. */
robj *listTypeGet(listTypeEntry *entry) {
robj *value = NULL;
//通过entry获取真正的数据 整型或者字符串
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
if (entry->entry.value) {
value = createStringObject((char *)entry->entry.value,
entry->entry.sz);
} else {
value = createStringObjectFromLongLong(entry->entry.longval);
}
} else {
serverPanic("Unknown list encoding");
}
return value;
}
链表中的插入函数
void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
//提供头插法与尾插法,robj会维护其引用计数
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
value = getDecodedObject(value);
sds str = value->ptr;
size_t len = sdslen(str);
if (where == LIST_TAIL) {
quicklistInsertAfter((quicklist *)entry->entry.quicklist,
&entry->entry, str, len);
} else if (where == LIST_HEAD) {
quicklistInsertBefore((quicklist *)entry->entry.quicklist,
&entry->entry, str, len);
}
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}
还有一些比较简单的判等,删除与转化的函数
/* Compare the given object with the entry at the current position. */
int listTypeEqual(listTypeEntry *entry, robj *o) {
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
} else {
serverPanic("Unknown list encoding");
}
}

/* Delete the element pointed to. */
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
//删除制定元素,并且使得iter根据方向指向删除元素的前一个元素或者后一个元素。
quicklistDelEntry(iter->iter, &entry->entry);
} else {
serverPanic("Unknown list encoding");
}
}

/* Create a quicklist from a single ziplist */
void listTypeConvert(robj *subject, int enc) {
serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST);
serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST);
//从ziplist转化为quicklist

if (enc == OBJ_ENCODING_QUICKLIST) {
size_t zlen = server.list_max_ziplist_size;
int depth = server.list_compress_depth;
subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
subject->encoding = OBJ_ENCODING_QUICKLIST;
} else {
serverPanic("Unsupported list conversion");
}
}
一些对于t_list的一些api大概就上面的这些。而这些api是通过client的命令进行调用的,不同的命令会调用上面一个或多个的api。那么下面就来分析对于client来说,提供了哪些命令。

首先第一个为push操作

LPUSH KEY_NAME VALUE1.. VALUEN

RPUSH KEY_NAME VALUE1..VALUEN
void pushGenericCommand(client *c, int where) {
//插入命令
int j, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
//根据argv[1] 以写的方式获取 robj数据

if (lobj && lobj->type != OBJ_LIST) {//检查类型保证类型正确
addReply(c,shared.wrongtypeerr);
return;
}
//分两种情况,若查找成功则进行插入操作,若不存在则创建一个快链对象

for (j = 2; j < c->argc; j++) {
if (!lobj) {
//若不存在则创建一个新的quicklist链表
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
//设置快链配置
dbAdd(c->db,c->argv[1],lobj);
//添加该对象结构体到数据库中,以argv[1] 作为key
}
listTypePush(lobj,c->argv[j],where);
//根据where向快链中插入数据集
pushed++;//记录插入的数量
}

addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
//返回当前list元素个数。
if (pushed) {
//前插入还是后插入
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

signalModifiedKey(c->db,c->argv[1]);
//数据库被修改则调用该函数发送修改信号
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
//发送时间通知
}

server.dirty += pushed;
//增加脏数据。
}

void lpushCommand(client *c) {
pushGenericCommand(c,LIST_HEAD);
}

void rpushCommand(client *c) {
pushGenericCommand(c,LIST_TAIL);
}
pushx命令 与前者区别在与该命令保证了,链表必然存在,不会自动创建链表对象。

RPUSHX KEY_NAME VALUE1..VALUEN

LPUSHX KEY_NAME VALUE1.. VALUEN
void pushxGenericCommand(client *c, int where) {
int j, pushed = 0;
robj *subject;
//该操作保证key存在。

//以写操作获取robj对象
if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;
//保证subject必定存在

for (j = 2; j < c->argc; j++) {
//存在则将所有的元素一一插入
listTypePush(subject,c->argv[j],where);
pushed++;
}

addReplyLongLong(c,listTypeLength(subject));
//返回添加后元素个数

if (pushed) {
//消息与事件的通知
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
//增加脏数据个数
}

void lpushxCommand(client *c) {
pushxGenericCommand(c,LIST_HEAD);
}

void rpushxCommand(client *c) {
pushxGenericCommand(c,LIST_TAIL);
}

插入命令 Redis Linsert 命令用于在列表的元素前或者后插入元素。当指定元素不存在于列表中时,不执行任何操作。 

命令格式  LINSERT key BEFORE|AFTER pivot value结合命令来看,其实现就很容易看懂了。

void linsertCommand(client *c) {
//insert的底层实现
//利用迭代器,找到元素在前面或者后面插入
int where;
robj *subject;
listTypeIterator *iter;
listTypeEntry entry;
int inserted = 0;
//比较where 忽略大小写 判别是在index哪里插入
if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
where = LIST_TAIL;
} else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
where = LIST_HEAD;
} else {
addReply(c,shared.syntaxerr);
return;
}
//保证元素必定存在
if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;

/* Seek pivot from head to tail */
iter = listTypeInitIterator(subject,0,LIST_TAIL);
//初始化迭代器,方向从头到尾
while (listTypeNext(iter,&entry)) {
if (listTypeEqual(&entry,c->argv[3])) {
//找到元素后,则进行插入操作
listTypeInsert(&entry,c->argv[4],where);
inserted = 1;
break;
}
}
listTypeReleaseIterator(iter);
//释放迭代器空间

if (inserted) {
//插入成功那么发送修改信号与事件通知
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
c->argv[1],c->db->id);
server.dirty++;
//增加脏数据个数
} else {
/* Notify client of a failed insert */
addReply(c,shared.cnegone);
//否则通知client插入失败
return;
}

addReplyLongLong(c,listTypeLength(subject));
}
        接下来的操作都是大同小异的,感觉只是不断的重复。另外还有一个比较特殊的东西就是list 的阻塞。阻塞与普通的pop的区别在于当执行的时候,当元素或链表不存在,那么会将请求命令的client阻塞掉。直到另外的客户端插入该元素或者阻塞超时。原来的客户端才会从阻塞的状态中恢复过来。这些api中,重要的函数就是调用blockForKeys进行阻塞。包括了下面的几个操作。

BLPOP key1 [key2 ] timeout 

移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

BRPOP key1 [key2 ] timeout 

移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

BRPOPLPUSH source destination timeout 
从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

ttimeout就是设定的时间。我们抽一个bpop看一下其实现。

/* Blocking RPOP/LPOP */
void blockingPopGenericCommand(client *c, int where) {
robj *o;
mstime_t timeout;
int j;

if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= C_OK) return;
//获取时间,用于计算timeout

for (j = 1; j < c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
if (o->type != OBJ_LIST) {//检查类型
addReply(c,shared.wrongtypeerr);
return;
} else {
if (listTypeLength(o) != 0) {
//若元素存在,那么做法跟前面的相似,弹出元素后直接return
/* Non empty list, this is like a non normal [LR]POP. */
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);
serverAssert(value != NULL);

addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(NOTIFY_LIST,event,
c->argv[j],c->db->id);
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
signalModifiedKey(c->db,c->argv[j]);
server.dirty++;

/* Replicate it as an [LR]POP instead of B[LR]POP. */
rewriteClientCommandVector(c,2,
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
c->argv[j]);
return;
}
}
}
}

/* If we are inside a MULTI/EXEC and the list is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
addReply(c,shared.nullmultibulk);
return;
}
//若不存在元素调用blockForKeys进行阻塞。

/* If the list is empty or the key does not exists we must block */
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
}

void blpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_HEAD);
}

void brpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_TAIL);
}
接下来的关键的blockForKeys实现在block.c文件中。redis对于block其实是维护了两个数据结构。一个为key链表
/* Set a client in blocking mode for the specified key (list or stream), with
* the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM
* depending on the kind of operation we are waiting for an empty key in
* order to awake the client. The client is blocked for all the 'numkeys'
* keys as in the 'keys' argument. When we block for stream keys, we also
* provide an array of streamID structures: clients will be unblocked only
* when items with an ID greater or equal to the specified one is appended
* to the stream. */
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
dictEntry *de;
//dict用于维护client与key对应关系
list *l;
//普通的双向队列 即adlist 用于维护阻塞的client对列
int j;

c->bpop.timeout = timeout;
c->bpop.target = target;

if (target != NULL) incrRefCount(target);

for (j = 0; j < numkeys; j++) {
//下面一大段的代码只是将元素添加到上面两个维护的数据结构中。
/* The value associated with the key name in the bpop.keys dictionary
* is NULL for lists, or the stream ID for streams. */
void *key_data = NULL;
if (btype == BLOCKED_STREAM) {
key_data = zmalloc(sizeof(streamID));
memcpy(key_data,ids+j,sizeof(streamID));
}

/* If the key already exists in the dictionary ignore it. */
if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) {
zfree(key_data);
continue;
}
incrRefCount(keys[j]);

/* And in the other "side", to map keys -> clients */
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;

/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
listAddNodeTail(l,c);
}
blockClient(c,btype);
}
初始化了两个阻塞的数据结构后,就可以对client进行阻塞操作了。阻塞了当然还会有unblock的操作。
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
* flag is set client query buffer is not longer processed, but accumulated,
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
//设置block操作
//这后面unblock就设计到client与service的通信的东西,暂时没看先不研究。
//等后面研究到了再回过头来看吧。
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
server.blocked_clients++;
server.blocked_clients_by_type[btype]++;
}
        对于t_list的源码分析就到这里。中间花的时间还是挺长的。主要是去了一趟杭州网易c++实习生的面试。结果还在二面挂了。真的是难过。来来去去,总觉得自己掌握的东西还是太少了,网络与数据库都几乎等于0。哎,路漫漫其修远兮,勉之!       




BRPOPLPUSH source destination timeout 
从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: