memcached学习之item部分
2016-06-16 18:37
666 查看
item部分的功能介绍
item部分是memcached的存储结构,所有的读写修改等操作都是通过对item的处理来进行的,另外对于缓存来说,其能够使用的内存空间是有限的,如果将请求数少或者过期的数据清理后将内存腾出来也是必备的一个功能,这里面memcached的策略主要就是LRU方式和lazy expiration方法,这篇里面只会涉及lazy expiration方法;不考虑LRU方法的原因是作为缓存系统核心的方法,会在后面单独起一篇专门介绍。item部分的整体逻辑
通过前面assoc和slabs部分介绍可以知道,assoc是为了快速查找数据而存在的,slabs是数据存放的地方,那数据本身就是item;因此item会存放数据的相关信息,比如时间、类别、k\v等,对数据的操作无非是增删改查,这些操作都是要传递key信息的,所以对item操作的一般步骤都是:根据key值查找hashtable;
hashtable会返回该key对应的item的地址,注意是地址,直接找到的就是slabs中的存储位置(新增数据这一步没有);
根据找的item的地址,根据item的存储结构,执行对响应字段修改。
下面会介绍对item的操作函数,这里面稍微需要先知道一点,得到的item的地址链接到hashtable中是通过do_item_link和do_item_unlink来完成的。
item部分的代码分解
自定义类型
typedef struct _stritem { /* Protected by LRU locks */ struct _stritem *next; struct _stritem *prev; //这两个用于LRU,暂时不管 /* Rest are protected by an item lock */ //用于hashtable中建立链表 struct _stritem *h_next; /* hash chain next */ //记录最近的存取时间 rel_time_t time; /* least recent access */ //设定的过期时间 rel_time_t exptime; /* expire time */ //数据大小 int nbytes; /* size of data */ unsigned short refcount; //引用次数 uint8_t nsuffix; /* length of flags-and-length string */ //item的标记 uint8_t it_flags; /* ITEM_* above */ //应该属于slabclass的层级 uint8_t slabs_clsid;/* which slab class we're in */ //key的长度, uint8_t nkey; /* key length, w/terminating null and padding */ /* this odd type prevents type-punning issues when we do * the little shuffle to save space when not using CAS. */ union { uint64_t cas; //cas数,用于一致性,校验使用 char end; } data[]; /* if it_flags & ITEM_CAS we have 8 bytes CAS */ /* then null-terminated key */ /* then " flags length\r\n" (no terminating null) */ /* then data with terminating \r\n (no terminating null; it's binary!) */ } item;
变量分解
代码分解
1、item_is_flushed函数,用来判断item是否被冲洗;int item_is_flushed(item *it) { rel_time_t oldest_live = settings.oldest_live; //设置的item最老的存在时间 uint64_t cas = ITEM_get_cas(it); //获取item的cas uint64_t oldest_cas = settings.oldest_cas; //设置的最老的cas if (oldest_live == 0 || oldest_live > current_time) return 0; //如果没有设置oldest_live,或者最老的却比现在的时间还早,则没有冲洗 if ((it->time <= oldest_live) || (oldest_cas != 0 && cas != 0 && cas < oldest_cas)) { return 1; //如果item的最后访问时间在最老时间之前,或者cas存在且item的cas在最老的cas之前,则应该被冲洗 } return 0; }
2、item_make_header函数,用来计算对象信息头的大小;
static size_t item_make_header(const uint8_t nkey, const int flags, const int nbytes, char *suffix, uint8_t *nsuffix) { /* suffix is defined at 40 chars elsewhere.. */ *nsuffix = (uint8_t) snprintf(suffix, 40, " %d %d\r\n", flags, nbytes - 2); //这里是计算(flag, value)大小信息所占用的字节数,nbytes包括value的长度和\r\n的长度 return sizeof(item) + nkey + *nsuffix + nbytes; }
3、do_item_alloc函数,执行为当前数据分配item过程,实际上就是从slabs获取一个内存地址,存放当前的数据,注意一点是,这里面仅仅是将item的地址返回了,在代码中其实可以看到,分配了地址并没有用,因为没有将地址添加到hashtable中去,这样即使添加了数据,也无法查找,真正添加的过程是在store_item函数中;
item *do_item_alloc(char *key, const size_t nkey, const int flags,const rel_time_t exptime, const 4000 int nbytes, const uint32_t cur_hv) { int i; uint8_t nsuffix; item *it = NULL; char suffix[40]; unsigned int total_chunks; size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix); //得到所需要的内存大小 if (settings.use_cas) { //为了简化分析,不考虑CAS策略 ntotal += sizeof(uint64_t); } unsigned int id = slabs_clsid(ntotal); //获取可以存放该item的slabclass层级 if (id == 0) return 0; //没有找到可以存放,直接返回 /* If no memory is available, attempt a direct LRU juggle/eviction */ /* This is a race in order to simplify lru_pull_tail; in cases where * locked items are on the tail, you want them to fall out and cause * occasional OOM's, rather than internally work around them. * This also gives one fewer code path for slab alloc/free */ //这里面说的是如果没有内存可用,则可以通过LRU算法来更新,这里面 //暂时不分析涉及LRU过程的代码,因为会单独开辟一个章节来详细介绍 for (i = 0; i < 5; i++) { /* Try to reclaim memory first */ if (!settings.lru_maintainer_thread) { lru_pull_tail(id, COLD_LRU, 0, false, cur_hv); } it = slabs_alloc(ntotal, id, &total_chunks, 0); //这里去获取一个可以存放当前数据的内存地址 if (settings.expirezero_does_not_evict) total_chunks -= noexp_lru_size(id); if (it == NULL) { if (settings.lru_maintainer_thread) { lru_pull_tail(id, HOT_LRU, total_chunks, false, cur_hv); lru_pull_tail(id, WARM_LRU, total_chunks, false, cur_hv); lru_pull_tail(id, COLD_LRU, total_chunks, true, cur_hv); } else { lru_pull_tail(id, COLD_LRU, 0, true, cur_hv); } } else { break; } } if (i > 0) { pthread_mutex_lock(&lru_locks[id]); itemstats[id].direct_reclaims += i; pthread_mutex_unlock(&lru_locks[id]); } if (it == NULL) { pthread_mutex_lock(&lru_locks[id]); itemstats[id].outofmemory++; pthread_mutex_unlock(&lru_locks[id]); return NULL; } assert(it->slabs_clsid == 0); //assert(it != heads[id]); /* Refcount is seeded to 1 by slabs_alloc() */ it->next = it->prev = it->h_next = 0; //初始化item /* Items are initially loaded into the HOT_LRU. This is '0' but I want at * least a note here. Compiler (hopefully?) optimizes this out. */ if (settings.lru_maintainer_thread) { if (exptime == 0 && settings.expirezero_does_not_evict) { id |= NOEXP_LRU; } else { id |= HOT_LRU; } } else { /* There is only COLD in compat-mode */ id |= COLD_LRU; } it->slabs_clsid = id; //设置该item存在的对应slabclass层 DEBUG_REFCNT(it, '*'); it->it_flags = settings.use_cas ? ITEM_CAS : 0;//设置标记 it->nkey = nkey; //设置key的长度 it->nbytes = nbytes; //设置VALUE的长度 memcpy(ITEM_key(it), key, nkey); //将key内容拷贝到内存 it->exptime = exptime; //设置过期时间 memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix); //拷贝前缀到内存 it->nsuffix = nsuffix; //设置前缀的长度 return it; //返回item地址 }
4、item_free函数,释放item;
void item_free(item *it) { size_t ntotal = ITEM_ntotal(it); //计算占用的内存大小 unsigned int clsid; assert((it->it_flags & ITEM_LINKED) == 0); assert(it != heads[it->slabs_clsid]); assert(it != tails[it->slabs_clsid]); assert(it->refcount == 0); /* so slab size changer can tell later if item is already free or not */ clsid = ITEM_clsid(it); //计算对应slabclass层级 DEBUG_REFCNT(it, 'F'); slabs_free(it, ntotal, clsid); //释放该内存到空闲列表 }
5、item_size_ok函数,用来确定当前内存是否适合该数据的长度;
bool item_size_ok(const size_t nkey, const int flags, const int nbytes) { char prefix[40]; uint8_t nsuffix; size_t ntotal = item_make_header(nkey + 1, flags, nbytes, prefix, &nsuffix); if (settings.use_cas) { ntotal += sizeof(uint64_t); } return slabs_clsid(ntotal) != 0; }
6、do_item_link函数,用于将item绑定到hashtable和LRU数组中去;
int do_item_link(item *it, const uint32_t hv) { MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes); assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0); it->it_flags |= ITEM_LINKED; //修改为已绑定 it->time = current_time; //访问时间更新 STATS_LOCK(); //状态信息不关心 stats.curr_bytes += ITEM_ntotal(it); stats.curr_items += 1; stats.total_items += 1; STATS_UNLOCK(); /* Allocate a new CAS ID on link. */ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); //设置cas assoc_insert(it, hv); //插入到hashtable中 item_link_q(it); //添加到LRU数组heads和tails refcount_incr(&it->refcount); return 1; }
7、do_item_unlink函数,将item从hashtable中移除;
void do_item_unlink(item *it, const uint32_t hv) { MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes); if ((it->it_flags & ITEM_LINKED) != 0) { it->it_flags &= ~ITEM_LINKED; //修改标记位 STATS_LOCK(); //状态信息不关心 stats.curr_bytes -= ITEM_ntotal(it); stats.curr_items -= 1; STATS_UNLOCK(); assoc_delete(ITEM_key(it), it->nkey, hv); //从hashtable中删除 item_unlink_q(it); //从LRU数组heads和tails数组中移除 do_item_remove(it); //释放item占用内存 } }
8、do_item_remove函数,执行item内存释放过程;
void do_item_remove(item *it) { MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes); assert((it->it_flags & ITEM_SLABBED) == 0); assert(it->refcount > 0); if (refcount_decr(&it->refcount) == 0) { //等待引用计数减到0才释放 item_free(it); } }
9、do_item_update函数,执行item的更新操作;
void do_item_update(item *it) { MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes); if (it->time < current_time - ITEM_UPDATE_INTERVAL) { //只有当item在允许间隔之后才更新 assert((it->it_flags & ITEM_SLABBED) == 0); if ((it->it_flags & ITEM_LINKED) != 0) { it->time = current_time; if (!settings.lru_maintainer_thread) { item_unlink_q(it); //更新的是LRU数组,如果有lru线程,不需要自己进行更新 item_link_q(it); } } } }
10、do_item_replace函数,执行item的替换过程;
int do_item_replace(item *it, item *new_it, const uint32_t hv) { MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes, ITEM_key(new_it), new_it->nkey, new_it->nbytes); assert((it->it_flags & ITEM_SLABBED) == 0); do_item_unlink(it, hv); //首先移除旧的item return do_item_link(new_it, hv); //添加新的item }
11、do_item_get函数,用于根据key的值获取对应的item;
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { item *it = assoc_find(key, nkey, hv); //通过hashtable找到item的地址 if (it != NULL) { refcount_incr(&it->refcount); //增加引用计数 /* Optimization for slab reassignment. prevents popular items from * jamming in busy wait. Can only do this here to satisfy lock order d29c * of item_lock, slabs_lock. */ /* This was made unsafe by removal of the cache_lock: * slab_rebalance_signal and slab_rebal.* are modified in a separate * thread under slabs_lock. If slab_rebalance_signal = 1, slab_start = * NULL (0), but slab_end is still equal to some value, this would end * up unlinking every item fetched. * This is either an acceptable loss, or if slab_rebalance_signal is * true, slab_start/slab_end should be put behind the slabs_lock. * Which would cause a huge potential slowdown. * Could also use a specific lock for slab_rebal.* and * slab_rebalance_signal (shorter lock?) */ /*if (slab_rebalance_signal && ((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) { do_item_unlink(it, hv); do_item_remove(it); it = NULL; }*/ } int was_found = 0; if (settings.verbose > 2) { int ii; if (it == NULL) { fprintf(stderr, "> NOT FOUND "); } else { fprintf(stderr, "> FOUND KEY "); was_found++; } for (ii = 0; ii < nkey; ++ii) { fprintf(stderr, "%c", key[ii]); } } //这里说明下,memcached采用两种更新方式,一个是LRU算法,还有一个就是lazy expiration,也就是等到访问该item的时候判断是否过期 if (it != NULL) { if (item_is_flushed(it)) { //判断该item是否应该被冲洗 do_item_unlink(it, hv); //从hashtable和LRU数组中移除该item do_item_remove(it); //释放该item的内存 it = NULL; if (was_found) { fprintf(stderr, " -nuked by flush"); } } else if (it->exptime != 0 && it->exptime <= current_time) { //item超过设定的存活时间,移除! do_item_unlink(it, hv); do_item_remove(it); it = NULL; if (was_found) { fprintf(stderr, " -nuked by expire"); } } else { it->it_flags |= ITEM_FETCHED|ITEM_ACTIVE; DEBUG_REFCNT(it, '+'); } } if (settings.verbose > 2) fprintf(stderr, "\n"); return it; }
12、do_item_touch函数,修改下存活时间;
item *do_item_touch(const char *key, size_t nkey, uint32_t exptime, const uint32_t hv) { item *it = do_item_get(key, nkey, hv); if (it != NULL) { it->exptime = exptime; //修改存活时间 } return it; }
13、do_store_item函数,真正实现存储,也就是连接到hashtable中去;
enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) { char *key = ITEM_key(it); item *old_it = do_item_get(key, it->nkey, hv); //查找key对应的item的地址 enum store_item_type stored = NOT_STORED; item *new_it = NULL; int flags; if (old_it != NULL && comm == NREAD_ADD) { /* add only adds a nonexistent item, but promote to head of LRU */ //原来存在数据,且是ADD命令,执行更新操作 do_item_update(old_it); } else if (!old_it && (comm == NREAD_REPLACE || comm == NREAD_APPEND || comm == NREAD_PREPEND)) { /* replace only replaces an existing value; don't store */ } else if (comm == NREAD_CAS) { /* validate cas operation */ if(old_it == NULL) { // LRU expired stored = NOT_FOUND; pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.cas_misses++; pthread_mutex_unlock(&c->thread->stats.mutex); } else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) { // cas validates // it and old_it may belong to different classes. // I'm updating the stats for the one that's getting pushed out pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++; pthread_mutex_unlock(&c->thread->stats.mutex); item_replace(old_it, it, hv); stored = STORED; } else { pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++; pthread_mutex_unlock(&c->thread->stats.mutex); if(settings.verbose > 1) { fprintf(stderr, "CAS: failure: expected %llu, got %llu\n", (unsigned long long)ITEM_get_cas(old_it), (unsigned long long)ITEM_get_cas(it)); } stored = EXISTS; } } else { /* * Append - combine new and old record into single one. Here it's * atomic and thread-safe. */ if (comm == NREAD_APPEND || comm == NREAD_PREPEND) { /* * Validate CAS */ if (ITEM_get_cas(it) != 0) { // CAS much be equal if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) { stored = EXISTS; } } if (stored == NOT_STORED) { /* we have it and old_it here - alloc memory to hold both */ /* flags was already lost - so recover them from ITEM_suffix(it) */ flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10); new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */, hv); if (new_it == NULL) { /* SERVER_ERROR out of memory */ if (old_it != NULL) do_item_remove(old_it); return NOT_STORED; } /* copy data from it and old_it to new_it */ if (comm == NREAD_APPEND) { memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes); memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes); } else { /* NREAD_PREPEND */ memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes); memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes); } it = new_it; } } if (stored == NOT_STORED) { if (old_it != NULL) item_replace(old_it, it, hv); else do_item_link(it, hv); //这里面才是真正添加到hashtable中的执行者 c->cas = ITEM_get_cas(it); stored = STORED; } } if (old_it != NULL) do_item_remove(old_it); /* release our reference */ if (new_it != NULL) do_item_remove(new_it); if (stored == STORED) { c->cas = ITEM_get_cas(it); } return stored; }
相关文章推荐
- 实战Memcached缓存系统
- Tomcat端口被占用解决方法(不用重启)
- 页面缓存:内存和文件之间的那些事
- “传奇”图象数据存储方式
- Redis和Memcached的区别详解
- 浅析SQL Server中的执行计划缓存(上)
- Enterprise Library for .NET Framework 2.0缓存使用实例
- 超大数据量存储常用数据库分表分库算法总结
- PowerShell中编程清空IE缓存方法
- PowerShell中使用.NET将程序集加入全局程序集缓存
- SQL Server误区30日谈 第18天 有关FileStream的存储,垃圾回收以及其它
- C#中缓存的基本用法总结
- C++实现图的邻接表存储和广度优先遍历实例分析
- 详解Android文件存储
- Android实现图片异步加载并缓存到本地
- wap开发中如何有效的利用缓存减少消息的传送量
- PHP基于文件存储实现缓存的方法
- smarty缓存用法分析
- 在ASP.NET 2.0中操作数据之五十九:使用SQL缓存依赖项SqlCacheDependency
- 在ASP.NET 2.0中操作数据之五十八:在程序启动阶段缓存数据