您的位置:首页 > 数据库 > Redis

结合redis设计与实现的redis源码学习-8.3-t_list.c(列表键)

2017-11-04 00:04 1101 查看
T_list.c主要说明了列表键和列表键的命令实现方式;

#include "server.h"
//将一个值放入列表中,根据where判断是头插还是尾插
void listTypePush(robj *subject, robj *value, int where) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
value = getDecodedObject(value);
size_t len = sdslen(value->ptr);
quicklistPush(subject->ptr, value->ptr, len, pos);//这个函数在quicklist.h中
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}
//列表弹出保护程序
void *listPopSaver(unsigned char *data, unsigned int sz) {
return createStringObject((char*)data,sz);
}
//根据列表类型弹出数据
robj *listTypePop(robj *subject, int where) {
long long vlong;
robj *value = NULL;

int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
NULL, &vlong, listPopSaver)) {
if (!value)
value = createStringObjectFromLongLong(vlong);
}
} else {
serverPanic("Unknown list encoding");
}
return value;
}
//返回列表长度
unsigned long listTypeLength(robj *subject) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
return quicklistCount(subject->ptr);
} else {
serverPanic("Unknown list encoding");
}
}
/* Initialize an iterator at the specified index. 在制定索引创建一个迭代器*/
listTypeIterator *listTypeInitIterator(robj *subject, long index,
unsigned char direction) {
listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
li->subject = subject;
li->encoding = subject->encoding;
li->direction = direction;
li->iter = NULL;
/* LIST_HEAD means start at TAIL and move *towards* head.
* LIST_TAIL means start at HEAD and move *towards tail. */
int iter_direction =
direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
if (li->encoding == OBJ_ENCODING_QUICKLIST) {
li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
iter_direction, index);
} else {
serverPanic("Unknown list encoding");
}
return li;
}
/* Clean up the iterator. 释放迭代器指向的数据和迭代器*/
void listTypeReleaseIterator(listTypeIterator *li) {
zfree(li->iter);
zfree(li);
}
//迭代器指向下一元素
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
/* Protect from converting when iterating */
serverAssert(li->
10d98
subject->encoding == li->encoding);

entry->li = li;
if (li->encoding == OBJ_ENCODING_QUICKLIST) {
return quicklistNext(li->iter, &entry->entry);
} else {
serverPanic("Unknown list encoding");
}
return 0;
}
/* Return entry or NULL at the current position of the iterator. 返回当前迭代器指向的元素信息*/
robj *listTypeGet(listTypeEntry *entry) {
robj *value = NULL;
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
if (entry->entry.value) {
value = createStringObject((char *)entry->entry.value,
entry->entry.sz);
} else {
value = createStringObjectFromLongLong(entry->entry.longval);
}
} else {
serverPanic("Unknown list encoding");
}
return value;
}
//给list插入元素
void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
value = getDecodedObject(value);
sds str = value->ptr;
size_t len = sdslen(str);
if (where == LIST_TAIL) {
quicklistInsertAfter((quicklist *)entry->entry.quicklist,
&entry->entry, str, len);
} else if (where == LIST_HEAD) {
quicklistInsertBefore((quicklist *)entry->entry.quicklist,
&entry->entry, str, len);
}
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}

/* Compare the given object with the entry at the current position. 对比当前位置的对象*/
int listTypeEqual(listTypeEntry *entry, robj *o) {
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
} else {
serverPanic("Unknown list encoding");
}
}

/* Delete the element pointed to.删除元素的指针 */
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistDelEntry(iter->iter, &entry->entry);
} else {
serverPanic("Unknown list encoding");
}
}

/* Create a quicklist from a single ziplist 从一个压缩列表创建一个快速链表*/
void listTypeConvert(robj *subject, int enc) {
serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST);
serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST);

if (enc == OBJ_ENCODING_QUICKLIST) {
size_t zlen = server.list_max_ziplist_size;
int depth = server.list_compress_depth;
subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
subject->encoding = OBJ_ENCODING_QUICKLIST;
} else {
serverPanic("Unsupported list conversion");
}
}

/*-----------------------------------------------------------------------------
* List Commands
*----------------------------------------------------------------------------*/
//通用的push函数,对外提供的命令函数会调用这个接口
void pushGenericCommand(client *c, int where) {
int j, waiting = 0, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
//可能会插入多个,遍历参数并插入
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
if (!lobj) {
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[1],lobj);
}
listTypePush(lobj,c->argv[j],where);
pushed++;
}
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}

void lpushCommand(client *c) {
pushGenericCommand(c,LIST_HEAD);
}

void rpushCommand(client *c) {
pushGenericCommand(c,LIST_TAIL);
}
//只有当 key 已经存在并且存着一个 list 的时候,才能在这个 key 下面的 list 插入 value。
void pushxGenericCommand(client *c, robj *refval, robj *val, int where) {
robj *subject;
listTypeIterator *iter;
listTypeEntry entry;
int inserted = 0;

if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;

if (refval != NULL) {
/* Seek refval from head to tail */
iter = listTypeInitIterator(subject,0,LIST_TAIL);
while (listTypeNext(iter,&entry)) {
if (listTypeEqual(&entry,refval)) {
listTypeInsert(&entry,val,where);
inserted = 1;
break;
}
}
listTypeReleaseIterator(iter);

if (inserted) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
c->argv[1],c->db->id);
server.dirty++;
} else {
/* Notify client of a failed insert */
addReply(c,shared.cnegone);
return;
}
} else {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

listTypePush(subject,val,where);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
server.dirty++;
}

addReplyLongLong(c,listTypeLength(subject));
}

void lpushxCommand(client *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
pushxGenericCommand(c,NULL,c->argv[2],LIST_HEAD);
}

void rpushxCommand(client *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
pushxGenericCommand(c,NULL,c->argv[2],LIST_TAIL);
}
//链表插入元素
void linsertCommand(client *c) {
c->argv[4] = tryObjectEncoding(c->argv[4]);
if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
pushxGenericCommand(c,c->argv[3],c->argv[4],LIST_TAIL);
} else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
pushxGenericCommand(c,c->argv[3],c->argv[4],LIST_HEAD);
} else {
addReply(c,shared.syntaxerr);
}
}
//返回列表长度
void llenCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
addReplyLongLong(c,listTypeLength(o));
}
//返回index上的元素
void lindexCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = NULL;

if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;

if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistEntry entry;
if (quicklistIndex(o->ptr, index, &entry)) {
if (entry.value) {
value = createStringObject((char*)entry.value,entry.sz);
} else {
value = createStringObjectFromLongLong(entry.longval);
}
addReplyBulk(c,value);
decrRefCount(value);
} else {
addReply(c,shared.nullbulk);
}
} else {
serverPanic("Unknown list encoding");
}
}
//将列表index位置上的值进行替换
void lsetCommand(client *c) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = c->argv[3];

if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;

if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
int replaced = quicklistReplaceAtIndex(ql, index,
value->ptr, sdslen(value->ptr));
if (!replaced) {
addReply(c,shared.outofrangeerr);
} else {
addReply(c,shared.ok);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
server.dirty++;
}
} else {
serverPanic("Unknown list encoding");
}
}
//弹出命令的通用函数
void popGenericCommand(client *c, int where) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;

robj *value = listTypePop(o,where);
if (value == NULL) {
addReply(c,shared.nullbulk);
} else {
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";

addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
if (listTypeLength(o) == 0) {//如果列表长度为0
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[1],c->db->id);
dbDelete(c->db,c->argv[1]);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
}

void lpopCommand(client *c) {
popGenericCommand(c,LIST_HEAD);
}

void rpopCommand(client *c) {
popGenericCommand(c,LIST_TAIL);
}
//遍历链表
void lrangeCommand(client *c) {
robj *o;
long start, end, llen, rangelen;

if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);

/* convert negative indexes */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;

/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.emptymultibulk);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;

/* Return the result in form of a multi-bulk reply */
addReplyMultiBulkLen(c,rangelen);
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);

while(rangelen--) {
listTypeEntry entry;
listTypeNext(iter, &entry);
quicklistEntry *qe = &entry.entry;
if (qe->value) {
addReplyBulkCBuffer(c,qe->value,qe->sz);
} else {
addReplyBulkLongLong(c,qe->longval);
}
}
listTypeReleaseIterator(iter);
} else {
serverPanic("List encoding is not QUICKLIST!");
}
}
//截断链表的一部分
void ltrimCommand(client *c) {
robj *o;
long start, end, llen, ltrim, rtrim;

if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);

/* convert negative indexes */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;

/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
/* Out of range start or start > end result in empty list */
ltrim = llen;
rtrim = 0;
} else {
if (end >= llen) end = llen-1;
ltrim = start;
rtrim = llen-end-1;
}

/* Remove list elements to perform the trim */
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistDelRange(o->ptr,0,ltrim);
quicklistDelRange(o->ptr,-rtrim,rtrim);
} else {
serverPanic("Unknown list encoding");
}

notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.ok);
}
//删除链表中的指定元素
void lremCommand(client *c) {
robj *subject, *obj;
obj = c->argv[3];
long toremove;
long removed = 0;

if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
return;

subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;

listTypeIterator *li;
if (toremove < 0) {
toremove = -toremove;
li = listTypeInitIterator(subject,-1,LIST_HEAD);
} else {
li = listTypeInitIterator(subject,0,LIST_TAIL);
}

listTypeEntry entry;
while (listTypeNext(li,&entry)) {
if (listTypeEqual(&entry,obj)) {
listTypeDelete(li, &entry);
server.dirty++;
removed++;
if (toremove && removed == toremove) break;
}
}
listTypeReleaseIterator(li);

if (removed) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"lrem",c->argv[1],c->db->id);
}

if (listTypeLength(subject) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}

addReplyLongLong(c,removed);
}

void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
/* Create the list if the key does not exist */
if (!dstobj) {
dstobj = createQuicklistObject();
quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,dstkey,dstobj);
}
signalModifiedKey(c->db,dstkey);
listTypePush(dstobj,value,LIST_HEAD);
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
//头插尾删和尾删头插
void rpoplpushCommand(client *c) {
robj *sobj, *value;
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,sobj,OBJ_LIST)) return;

if (listTypeLength(sobj) == 0) {
/* This may only happen after loading very old RDB files. Recent
* versions of Redis delete keys of empty lists. */
addReply(c,shared.nullbulk);
} else {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
robj *touchedkey = c->argv[1];

if (dobj && checkType(c,dobj,OBJ_LIST)) return;
value = listTypePop(sobj,LIST_TAIL);
/* We saved touched key, and protect it, since rpoplpushHandlePush
* may change the client command argument vector (it does not
* currently). */
incrRefCount(touchedkey);
rpoplpushHandlePush(c,c->argv[2],dobj,value);

/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);

/* Delete the source list when it is empty */
notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
if (listTypeLength(sobj) == 0) {
dbDelete(c->db,touchedkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
touchedkey,c->db->id);
}
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
}
}
//下方是block相关操作的代码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐