您的位置:首页 > 数据库 > Redis

魔改redis之添加命令hrandmember

2020-12-27 22:33 405 查看

魔改redis之添加命令hrandmember

目录
  • 参考文献
  • 正文

    前言

    想从

    redis
    hash
    表获取随机的键值对,但是发现
    redis
    只支持
    set
    的随机值SRANDMEMBER。但是如果把
    hash
    表中的数据又存一份,占用的空间又太大。也可以通过先
    HLEN
    获取
    hash
    表的大小,随机出一个偏移值,再调用
    HSCAN
    获得一组数据。或者直接多次随机,多次取值。但这样效率始终不如
    SRANDMEMBER
    (
    redis
    的开销主要是网络的开销)。于是想到魔改
    redis
    代码,使其支持对
    hash
    表的随机键值对。客户端
    jedis
    打算使用
    eval
    来调用
    redis
    中新加入的指令。

    Set类型与srandmember命令

    Set
    类型的编码可以是
    OBJ_ENCODING_HT
    或者
    OBJ_ENCODING_INTSET
    ,如果集合中的值全是数值,那么
    Set
    的编码(底层类型)为
    OBJ_ENCODING_INTSET
    , 如果加入了无法被解析为数值的字符串,或者
    set
    的大小超过了
    OBJ_SET_MAX_INTSET_ENTRIES
    默认
    512
    ,编码则会变更为
    OBJ_ENCODING_HT

    OBJ_ENCODING_INTSET
    就是存储着整数的有序数组。加入新值时新
    realloc
    新增内存,再使用
    memmove
    将对应位置后的数据后移,然后在对应的位置加入值。

    OBJ_ENCODING_HT
    编码就是
    dict
    类型,也就是字典。

    srandmember
    命令的主要处理函数是
    srandmemberWithCountCommand
    ,如果传入的
    count
    值是负数,意味着值可以重复。

    1. 如果值可以重复,那么每次随机取出一个成员。

    2. 如果

      set
      size
      小于请求的数量,则返回
      set
      集合中全部的值。

      //case 1
      if (!uniq) {
      addReplyMultiBulkLen(c,count);
      while(count--) {
      
      encoding = setTypeRandomElement(set,&ele,&llele);
      if (encoding == OBJ_ENCODING_INTSET) {
      addReplyBulkLongLong(c,llele);
      } else {
      addReplyBulkCBuffer(c,ele,sdslen(ele));
      }
      }
      return;
      }
      
      //case 2
      if (count >= size) {
      sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION);
      return;
      }
    3. 集合的数量没有远远大于请求的数量。将

      set
      的值复制到
      dict
      中,然后随机删除值,直到数量等于请求的值。

    4. 集合数量远大请求的数量。随机取值,加入

      dict
      中,数量满足后返回
      dict
      中的值。

      if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {
      setTypeIterator *si;
      
      /* Add all the elements into the temporary dictionary. */
      si = setTypeInitIterator(set);
      while((encoding = setTypeNext(si,&ele,&llele)) != -1) {
      int retval = DICT_ERR;
      
      if (encoding == OBJ_ENCODING_INTSET) {
      retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL);
      } else {
      retval = dictAdd(d,createStringObject(ele,sdslen(ele)),NULL);
      }
      serverAssert(retval == DICT_OK);
      }
      setTypeReleaseIterator(si);
      serverAssert(dictSize(d) == size);
      
      /* Remove random elements to reach the right count. */
      while(size > count) {
      dictEntry *de;
      
      de = dictGetRandomKey(d);
      dictDelete(d,dictGetKey(de));
      size--;
      }
      }
      
      else {
      unsigned long added = 0;
      robj *objele;
      
      while(added < count) {
      encoding = setTypeRandomElement(set,&ele,&llele);
      if (encoding == OBJ_ENCODING_INTSET) {
      objele = createStringObjectFromLongLong(llele);
      } else {
      objele = createStringObject(ele,sdslen(ele));
      }
      /* Try to add the object to the dictionary. If it already exists
      
      1044
      * free it, otherwise increment the number of objects we have
      * in the result dictionary. */
      if (dictAdd(d,objele,NULL) == DICT_OK)
      added++;
      else
      decrRefCount(objele);
      }
      }
      
      /* CASE 3 & 4: send the result to the user. */
      {
      dictIterator *di;
      dictEntry *de;
      
      addReplyMultiBulkLen(c,count);
      di = dictGetIterator(d);
      while((de = dictNext(di)) != NULL)
      addReplyBulk(c,dictGetKey(de));
      dictReleaseIterator(di);
      dictRelease(d);
      }

    Hash类型对比Set类型

    Hash类型和Set类型的关系非常密切,在

    java
    源码中,往往
    set
    类型就是由
    hash
    类型实现的。在
    redis
    中在数据量较大的时候也十分相似。

    前文提到

    Set
    类型的编码可以是
    intset
    或者是
    dict
    ziplist
    的编码是
    ziplist
    或者是
    dict
    。在当前的redis版本中,还并没有添加
    hrandmember
    命令(6.2及之前)。

    ziplist
    中的字符串长度超过
    OBJ_HASH_MAX_ZIPLIST_VALUE
    (默认值为64),或者
    entry
    的个数超过
    OBJ_HASH_MAX_ZIPLIST_ENTRIES
    (默认值为512),则会转化为
    hashtable
    编码。

    ziplist
    encoding
    就是尝试将字符串值解析成
    long
    并保存编码。
    hash
    类型和
    set
    类型最大的区别在于元素个数较少时,内部的编码不同,
    hash
    内部的编码是
    ziplist
    ,而
    set
    的内部编码是
    intset
    ,(个人认为
    hash
    intset
    内部编码不统一是一处失误,使用者对两者有着相似的用法,也就是需求类似,然而底层实现却不同,必然导致代码的重复,也确实如此,
    redis
    团队似乎因为这个原因迟迟没有添加
    hrandmember
    命令)

    hrandmember命令

    因为

    ziplist
    不能被随机访问。对于
    ziplist
    编码的
    hash
    表,我们采用以下算法,来保证每个被取出来的
    entry
    的概率是一样的。

    我们从长度为m的

    ziplist
    中取出n个entry,m>=n,设剩下的长度为m left ,剩余要取的个数为nleft,每次取球时,我们取它的概率为 nleft/m left 。

    这样能保证每个球被取出的概率相同,为

    n/m
    。可用数学归纳法证明。

    通过使用这种方式,我们将时间复杂度从O(nm)降为O(m)。

    处理

    hash
    编码,我们复制
    srandmember
    的代码,并稍作修改,避免字符串的复制以提高效率。

    注意:当编码为

    ziplist
    时,不支持负数的count。虽然也有返回值,但并不会重复,并且个数小于期望值。

    void hrandmemberWithCountCommand(client *c, long l) {
    unsigned long entryCount, hashSize;
    int uniq = 1;
    hashTypeIterator *hi;
    robj *hash;
    dict *d;
    double randomDouble;
    double threshold;
    unsigned long index = 0;
    
    if ((hash = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))
    == NULL || checkType(c,hash,OBJ_HASH)) return;
    
    if(l >= 0) {
    entryCount = (unsigned long) l;
    } else {
    entryCount = -l;
    uniq = 0;
    }
    
    hashSize = hashTypeLength(hash);
    if(entryCount > hashSize)
    entryCount = hashSize;
    addReplyMapLen(c, entryCount);
    hi = hashTypeInitIterator(hash);
    
    if(hash->encoding == OBJ_ENCODING_ZIPLIST) {
    while (hashTypeNext(hi) != C_ERR && entryCount != 0) {
    randomDouble = ((double)rand()) / RAND_MAX;
    threshold = ((double)entryCount) / (hashSize - index);
    if(randomDouble &
    564
    lt; threshold){
    entryCount--;
    addHashIteratorCursorToReply(c, hi, OBJ_HASH_KEY);
    addHashIteratorCursorToReply(c, hi, OBJ_HASH_VALUE);
    }
    
    index ++;
    }
    } else {
    // copy of srandmember
    if(!uniq) {
    while(entryCount--) {
    sds key, value;
    
    dictEntry *de = dictGetRandomKey(hash->ptr);
    key = dictGetKey(de);
    value = dictGetVal(de);
    addReplyBulkCBuffer(c,key,sdslen(key));
    addReplyBulkCBuffer(c,value,sdslen(value));
    }
    return;
    }
    
    if(entryCount >= hashSize) {
    
    while (hashTypeNext(hi) != C_ERR) {
    addHashIteratorCursorToReply(c, hi, OBJ_HASH_KEY);
    addHashIteratorCursorToReply(c, hi, OBJ_HASH_VALUE);
    }
    return;
    }
    
    static dictType dt = {
    dictSdsHash,                /* hash function */
    NULL,                       /* key dup */
    NULL,                       /* val dup */
    dictSdsKeyCompare,          /* key compare */
    NULL,                       /* key destructor */
    NULL,                       /* val destructor */
    NULL
    ad8
    /* allow to expand */
    };
    d = dictCreate(&dt,NULL);
    
    if(entryCount * HRANDMEMBER_SUB_STRATEGY_MUL > hashSize) {
    
    /* Add all the elements into the temporary dictionary. */
    while((hashTypeNext(hi)) != C_ERR) {
    int ret = DICT_ERR;
    sds key, value;
    
    key = hashTypeCurrentFromHashTable(hi,OBJ_HASH_KEY);
    value = hashTypeCurrentFromHashTable(hi,OBJ_HASH_VALUE);
    ret = dictAdd(d, key, value);
    
    serverAssert(ret == DICT_OK);
    }
    serverAssert(dictSize(d) == hashSize);
    
    /* Remove random elements to reach the right count. */
    while(hashSize > entryCount) {
    dictEntry *de;
    
    de = dictGetRandomKey(d);
    dictDelete(d,dictGetKey(de));
    hashSize--;
    }
    }
    
    else {
    unsigned long added = 0;
    sds sdsKey, sdsVal;
    
    while(added < entryCount) {
    dictEntry *de = dictGetRandomKey(hash->ptr);
    sdsKey = dictGetKey(de);
    sdsVal = dictGetVal(de);
    
    /* Try to add the object to the dictionary. If it already exists
    * free it, otherwise increment the number of objects we have
    * in the result dictionary. */
    if (dictAdd(d,sdsKey,sdsVal) == DICT_OK){
    added++;
    }
    }
    }
    
    {
    dictIterator *di;
    dictEntry *de;
    di = dictGetIterator(d);
    while((de = dictNext(di)) != NULL) {
    sds key = dictGetKey(de);
    sds value = dictGetVal(de);
    addReplyBulkCBuffer(c,key,sdslen(key));
    addReplyBulkCBuffer(c,value,sdslen(value));
    }
    
    dictReleaseIterator(di);
    dictRelease(d);
    }
    
    }
    hashTypeReleaseIterator(hi);
    }

    参考文献

    srandmember

    redis源码

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: