您的位置:首页 > 数据库 > Memcache

memcached学习笔记(三)

2016-03-15 22:14 246 查看
在学习笔记(二)中已经分析过,Memcached从网络读取完数据,解析数据,如果是Get操作,则执行Get操作,下面分析下Get操作的流程。

/*
* Returns an item if it hasn't been marked as expired,
* lazy-expiring as needed.
*/
item *item_get(const char *key, const size_t nkey) {
item *it;
uint32_t hv;
hv = hash(key, nkey); //获得分段锁信息
item_lock(hv); //分段加锁
it = do_item_get(key, nkey, hv); //执行get操作
item_unlock(hv); //释放锁
return it;
}
/* item_lock() must be held for an item before any modifications to either its
* associated hash bucket, or the structure itself.
* LRU modifications must hold the item lock, and the LRU lock.
* LRU's accessing items must item_trylock() before modifying an item.
* Items accessable from an LRU must not be freed or modified
* without first locking and removing from the LRU.
*/

void item_lock(uint32_t hv) {
mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]); //执行分段加锁
}
void item_unlock(uint32_t hv) {
mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]); //释放分段锁
}


以上三个函数都在thread.c文件中,do_item_get(key,nkey, hv)函数位于items.c文件第674行。

/** wrapper around assoc_find which does the lazy expiration logic */
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
item *it = assoc_find(key, nkey, hv); //从Hash表中获取相应的结构
if (it != NULL) {
refcount_incr(&it->refcount); //item的引用次数+1
/* Optimization for slab reassignment. prevents popular items from
* jamming in busy wait. Can only do this here to satisfy lock order
* of item_lock, slabs_lock. */
/* This was made unsafe by removal of the cache_lock:
* slab_rebalance_signal and slab_rebal.* are modified in a separate
* thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =
* NULL (0), but slab_end is still equal to some value, this would end
* up unlinking every item fetched.
* This is either an acceptable loss, or if slab_rebalance_signal is
* true, slab_start/slab_end should be put behind the slabs_lock.
* Which would cause a huge potential slowdown.
* Could also use a specific lock for slab_rebal.* and
* slab_rebalance_signal (shorter lock?)
*/
/*if (slab_rebalance_signal &&
((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
}*/
}
int was_found = 0;
//打印调试信息,不重要
if (settings.verbose > 2) {
int ii;
if (it == NULL) {
fprintf(stderr, "> NOT FOUND ");
} else {
fprintf(stderr, "> FOUND KEY ");
was_found++;
}
for (ii = 0; ii < nkey; ++ii) {
fprintf(stderr, "%c", key[ii]);
}
}

if (it != NULL) {
if (is_flushed(it)) {//判断Memcached初始化是否开启过期删除机制,如果开启,则执行删除相关操作
do_item_unlink(it, hv); //将item从hashtable和LRU链中移除
do_item_remove(it); //删除item
it = NULL;
if (was_found) {
fprintf(stderr, " -nuked by flush");
}
} else if (it->exptime != 0 && it->exptime <= current_time) {//判断item是否过期
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
if (was_found) {
fprintf(stderr, " -nuked by expire");
}
} else {
it->it_flags |= ITEM_FETCHED|ITEM_ACTIVE; //item的标识修改为已经读取
DEBUG_REFCNT(it, '+');
}
}

if (settings.verbose > 2)
fprintf(stderr, "\n");

return it;
}


Set操作对应函数store_item(item*item,
int comm, conn* c),该函数位于thread.c文件的第590行。

/*
* Stores an item in the cache (high level, obeys set/add/replace semantics)
*/
enum store_item_type store_item(item *item, int comm, conn* c) {
enum store_item_type ret;
uint32_t hv;

hv = hash(ITEM_key(item), item->nkey); //获取Hash表的分段锁
item_lock(hv); //分段加锁
ret = do_store_item(item, comm, c, hv); //存储item
item_unlock(hv); //释放锁
return ret;
}


其中,do_store_item(item,comm, c, hv)函数位于memcached.c文件的第2303行。
/*
* Stores an item in the cache according to the semantics of one of the set
* commands. In threaded mode, this is protected by the cache lock.
*
* Returns the state of storage.
*/
enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
char *key = ITEM_key(it);
item *old_it = do_item_get(key, it->nkey, hv); //读取相应的item,如果没有相关的数据,old_it为NULL
enum store_item_type stored = NOT_STORED; //item状态标记

item *new_it = NULL;
int flags;

if (old_it != NULL && comm == NREAD_ADD) {//如果old_it不为NULL,且操作为add操作
/* add only adds a nonexistent item, but promote to head of LRU */
do_item_update(old_it); //更新数据
} else if (!old_it && (comm == NREAD_REPLACE
|| comm == NREAD_APPEND || comm == NREAD_PREPEND))
{
/* replace only replaces an existing value; don't store */
} else if (comm == NREAD_CAS) {//以cas方式读取
/* validate cas operation */
if(old_it == NULL) {
// LRU expired
stored = NOT_FOUND;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.cas_misses++;//更新Worker线程统计数据
pthread_mutex_unlock(&c->thread->stats.mutex);
}
else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {//old_it不为NULL,且cas属性一致
// cas validates
// it and old_it may belong to different classes.
// I'm updating the stats for the one that's getting pushed out
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;//更新Worker线程统计信息
pthread_mutex_unlock(&c->thread->stats.mutex);

item_replace(old_it, it, hv); //用新的item替换老的item
stored = STORED;
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++;
pthread_mutex_unlock(&c->thread->stats.mutex);

if(settings.verbose > 1) {
fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
(unsigned long long)ITEM_get_cas(old_it),
(unsigned long long)ITEM_get_cas(it));
}
stored = EXISTS; //修改状态值,修改状态值为已经存在,且不存储最新的数据
}
} else {
/*
* Append - combine new and old record into single one. Here it's
* atomic and thread-safe.
*/
if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {//以追加的方式执行写
/*
* Validate CAS
*/
if (ITEM_get_cas(it) != 0) {
// CAS much be equal
if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {//cas验证不通过
stored = EXISTS; //修改状态值为已存在
}
}

if (stored == NOT_STORED) {//状态值为没有存储,则执行写操作
/* we have it and old_it here - alloc memory to hold both */
/* flags was alrea
a37a
dy lost - so recover them from ITEM_suffix(it) */

flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);

new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */, hv); //申请新的空间

if (new_it == NULL) {
/* SERVER_ERROR out of memory */
if (old_it != NULL)
do_item_remove(old_it); //删除老的item

return NOT_STORED;
}

/* copy data from it and old_it to new_it */

if (comm == NREAD_APPEND) {
memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes); //老数据拷贝到新数据中
memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes); //拷贝最近缓冲区已有的数据
} else {
/* NREAD_PREPEND */
memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
}

it = new_it;
}
}

if (stored == NOT_STORED) {
if (old_it != NULL) //如果old_it不为空
item_replace(old_it, it, hv); //替换老的值
else
do_item_link(it, hv); //重新存储数据

c->cas = ITEM_get_cas(it);

stored = STORED;
}
}

if (old_it != NULL)
do_item_remove(old_it);         /* release our reference */
if (new_it != NULL)
do_item_remove(new_it);

if (stored == STORED) {
c->cas = ITEM_get_cas(it);
}

return stored;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  memcached rdma