您的位置:首页 > 数据库 > Memcache

Memcached中item锁的粒度

2015-08-06 19:17 507 查看
在多线程环境中,对于memcached中的item操作,应该对它加锁,如果加所有的item都加一个全局锁,这样对于一把锁来控制所有hash桶中的item,粒度实在是太大了,一个桶的插入删除操作会阻塞另一个桶的操作。如果锁粒度很小的话,例如每个hash桶加锁,那么如果是在hash桶扩容的情况下,一次扩容操作可能涉及到多个桶数据的迁移,这样需要对多个桶的锁进行循环加锁,这样就有些复杂。Memcached的解决方法就是在按照情况进行锁粒度的转换。

      具体的解决办法是:memcached在扩容操作时,加的都是全局锁,就是所有item(所有hash桶中的)都是一把锁,在扩容操作中,item的操作,例如hash表的删除,插入,touch,查询都是去竞争那个全局锁,因为原来的元素在old_table中的元素需要rehash到primary_table,虽然可以在old_table中的每个桶上加锁,但是没法控制primary_table的多进程操作,小于expand_bucket的元素会直接进入primary_table,old_table的元素会按照新的hash值进入到primary中,不能确定rehash到primary_table的哪个桶中,所以这时侯只能获取全局锁。在扩容结束时,item锁被重新切换回hash桶上的锁,这里锁是分段加锁的(几个桶一个锁,这个具体数值取决与初始的worker的数量,worker数量越多,锁越细,越少hash桶公用一个锁)。默认hash桶是的1<<16个hash桶

assoc.c

 62 void assoc_init(const int hashtable_init) {

 63    if (hashtable_init) {
  //初始为空

 64       hashpower = hashtable_init;

 65    }

 66 primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));

这里默认的hashtable_init为空,这里的hashpower=HASHPOWER_DEFAULT,等于16

thread.c

786 if (nthreads < 3) { 

787 power = 10;

788 } else if (nthreads < 4) {

789 power = 11;

790 } else if (nthreads < 5) {

791 power = 12;

792 } else {

793 /* 8192 buckets, and central
locks don't scale much past 5 threads */

794 power = 13;

795 }

796 

797 item_lock_count = hashsize(power);

798 

799 item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));

     可以看出在小于3个worker的时候,在默认hash桶的数量下,相隔64个桶一个锁,在4的时候相隔32个桶一把锁,到了6个以上的worker的时候默认情况下相隔8个桶一个锁。

     在每次操作锁的时候会判断下锁的类型,如果全局锁,就对全局加锁,如果是细粒度()的锁,就通过hash值去取得hash桶的锁

124 void item_lock(uint32_t hv) {

125 uint8_t *lock_type = pthread_getspecific(item_lock_type_key); 
 

126 if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {//细粒度的锁

127      mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);

         //取得hash值在去取得相应的锁

128 } else {

129      mutex_lock(&item_global_lock);//全局锁

130 }

131 }

     每个线程的锁的类型存在了每个线程的私有空间中,用函数 pthread_setspecific和 pthread_getspecific取得,每个线程默认锁的类型是分段锁(hash桶锁)

368 static void *worker_libevent(void *arg) {

369 LIBEVENT_THREAD *me = arg;

370 

371 /* Any per-thread setup can happen here; thread_init() will
block until

372 * all threads have finished initializing.

373 */

374 

375 /* set an indexable thread-specific
memory item for the lock type.

376 * this could be unnecessary if we pass the conn *c struct through

377 * all item_lock calls...

378 */

379 me->item_lock_type = ITEM_LOCK_GRANULAR;

380 pthread_setspecific(item_lock_type_key, &me->item_lock_type);

381 

382 register_thread_initialized();

383 

384 event_base_loop(me->base, 0);

385 return NULL;

386 }

       如果在hash空间不够的时候,需要对hash表进行扩容,这时候需要转换item的锁从分段锁转化成为全局锁,由于hash扩容操作是一个单独的线程在做,改变锁的类型需要改变所有worker对item的锁的类型,这时候memcached是通过循环每个worker对他们的pipe的一端写入锁转换的命令,在每个worker其实都有两套libevent
loop,一套用于监听其他线程发给work的信息,例如主线程去监听网络,如果有连接来了,主线程通过这个pipe通知worker,还有就是这个锁的类型的变化,通过pipe通知,另一套loop就是去通过那个连接的fd去轮询网络中的读事件等。

174 void switch_item_lock_type(enum item_lock_types type) {

175 char buf[1];//通过pipe发给worker的char
buf

176 int i;

177 

178 switch (type) {

179    case ITEM_LOCK_GRANULAR:

180    buf[0] = 'l';//转换为分段锁

181    break;

182 case ITEM_LOCK_GLOBAL:

183    buf[0] = 'g';//转换为全局锁

184    break;

185 default:

186 fprintf(stderr, "Unknown lock type: %d\n", type);

187 assert(1 == 0);

188 break;

189 }

190 

191 pthread_mutex_lock(&init_lock);

192 init_count = 0;

193 for (i = 0; i < settings.num_threads; i++) {

194 if (write(threads[i].notify_send_fd, buf, 1) != 1) {

    //向每个worker的notify_send_fd (pipe的一端)写入buf

195 perror("Failed writing to notify pipe");

196 /* TODO: This is a
fatal problem. Can it ever happen temporarily? */

197 }

198 }

199 wait_for_thread_registration(settings.num_threads);//等待每个线程处理完设置完锁的类型

200 pthread_mutex_unlock(&init_lock);

201 }

在thread.c中锁的类型转换:

427 case 'l':

428    me->item_lock_type = ITEM_LOCK_GRANULAR;

429    register_thread_initialized();

430    break;

431 case 'g':

432    me->item_lock_type = ITEM_LOCK_GLOBAL;

433    register_thread_initialized();

434    break;

435 }

通过连接每个worker的管道就设置了锁的类型。

在hash扩容时会加全局锁

204 static void *assoc_maintenance_thread(void *arg) {

205 

206 while (do_run_maintenance_thread) {

207 int ii = 0;

208 

209 /* Lock the cache, and bulk
move multiple buckets to the new

210 * hash table. */

211 item_lock_global();//加的是全局锁

212 mutex_lock(&cache_lock);

      在没有开始扩容时会把锁粒度调整到分段锁,同时打开slab_move,详见前一篇blog,扩容会使线程停留在等待 maintenance_cond上,在hash容量操作了桶的1.5倍后slab_move被停止,同时锁的粒度转换为全局锁,开始hash扩容

244 if (!expanding) {

245 /* finished expanding. tell all threads to use
fine-grained locks */

246 switch_item_lock_type(ITEM_LOCK_GRANULAR);//默认就是分段锁

247 slabs_rebalancer_resume(); //打开slab_move

248 /* We are done expanding.. just
wait for next invocation */

249 mutex_lock(&cache_lock); 

250 started_expanding = false; 

251 pthread_cond_wait(&maintenance_cond, &cache_lock);//在不扩容时,线程会停留在这里

252 /* Before doing anything, tell threads to use
a global lock */

253 mutex_unlock(&cache_lock); //后面开始扩容准备 

254 slabs_rebalancer_pause();//停止slab_move

255 switch_item_lock_type(ITEM_LOCK_GLOBAL);//转换为全局锁

256 mutex_lock(&cache_lock);

257 assoc_expand();//开始扩容

258 mutex_unlock(&cache_lock);

259 }

在对item进行操作时都会加上调用 item_lock,参数时桶的hash值,会根据锁的类型来决定是否阻塞对item的操作。

例如插入item操作:

517 int item_link(item *item) {

518 int ret;

519 uint32_t hv;

520 

521 hv = hash(ITEM_key(item), item->nkey, 0);//取得hash值

522 item_lock(hv); //根据锁的类型加锁

523 ret = do_item_link(item, hv);//插入操作

524 item_unlock(hv);//释放锁

525 return ret;

526 }

再例如删除操作(释放item的slab中的空间):

532 void item_remove(item *item) {

533 uint32_t hv;

534 hv = hash(ITEM_key(item), item->nkey, 0);//取得hash值

535 

536 item_lock(hv);//根据锁的类型加锁

537 do_item_remove(item);//删除操作

538 item_unlock(hv);//释放锁

539 }

在thread.c中可以看到依靠这种锁的操作有item_get,item_touch,item_unlink,item_update,add_delta,store_item等

memcached中锁粒度的转换大大提高了它的并发性,对于每个item的操作,还有refcount变量来表示一个item的状态,在多线程中如何不用对每个item加锁,而通过refcount来控制item的操作,这个在研究中,下次blog再写。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: