Memcached中item锁的粒度
2015-08-06 19:17
507 查看
在多线程环境中,对于memcached中的item操作,应该对它加锁,如果加所有的item都加一个全局锁,这样对于一把锁来控制所有hash桶中的item,粒度实在是太大了,一个桶的插入删除操作会阻塞另一个桶的操作。如果锁粒度很小的话,例如每个hash桶加锁,那么如果是在hash桶扩容的情况下,一次扩容操作可能涉及到多个桶数据的迁移,这样需要对多个桶的锁进行循环加锁,这样就有些复杂。Memcached的解决方法就是在按照情况进行锁粒度的转换。
具体的解决办法是:memcached在扩容操作时,加的都是全局锁,就是所有item(所有hash桶中的)都是一把锁,在扩容操作中,item的操作,例如hash表的删除,插入,touch,查询都是去竞争那个全局锁,因为原来的元素在old_table中的元素需要rehash到primary_table,虽然可以在old_table中的每个桶上加锁,但是没法控制primary_table的多进程操作,小于expand_bucket的元素会直接进入primary_table,old_table的元素会按照新的hash值进入到primary中,不能确定rehash到primary_table的哪个桶中,所以这时侯只能获取全局锁。在扩容结束时,item锁被重新切换回hash桶上的锁,这里锁是分段加锁的(几个桶一个锁,这个具体数值取决与初始的worker的数量,worker数量越多,锁越细,越少hash桶公用一个锁)。默认hash桶是的1<<16个hash桶
assoc.c
62 void assoc_init(const int hashtable_init) {
63 if (hashtable_init) {
//初始为空
64 hashpower = hashtable_init;
65 }
66 primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
这里默认的hashtable_init为空,这里的hashpower=HASHPOWER_DEFAULT,等于16
thread.c
786 if (nthreads < 3) {
787 power = 10;
788 } else if (nthreads < 4) {
789 power = 11;
790 } else if (nthreads < 5) {
791 power = 12;
792 } else {
793 /* 8192 buckets, and central
locks don't scale much past 5 threads */
794 power = 13;
795 }
796
797 item_lock_count = hashsize(power);
798
799 item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
可以看出在小于3个worker的时候,在默认hash桶的数量下,相隔64个桶一个锁,在4的时候相隔32个桶一把锁,到了6个以上的worker的时候默认情况下相隔8个桶一个锁。
在每次操作锁的时候会判断下锁的类型,如果全局锁,就对全局加锁,如果是细粒度()的锁,就通过hash值去取得hash桶的锁
124 void item_lock(uint32_t hv) {
125 uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
126 if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {//细粒度的锁
127 mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);
//取得hash值在去取得相应的锁
128 } else {
129 mutex_lock(&item_global_lock);//全局锁
130 }
131 }
每个线程的锁的类型存在了每个线程的私有空间中,用函数 pthread_setspecific和 pthread_getspecific取得,每个线程默认锁的类型是分段锁(hash桶锁)
368 static void *worker_libevent(void *arg) {
369 LIBEVENT_THREAD *me = arg;
370
371 /* Any per-thread setup can happen here; thread_init() will
block until
372 * all threads have finished initializing.
373 */
374
375 /* set an indexable thread-specific
memory item for the lock type.
376 * this could be unnecessary if we pass the conn *c struct through
377 * all item_lock calls...
378 */
379 me->item_lock_type = ITEM_LOCK_GRANULAR;
380 pthread_setspecific(item_lock_type_key, &me->item_lock_type);
381
382 register_thread_initialized();
383
384 event_base_loop(me->base, 0);
385 return NULL;
386 }
如果在hash空间不够的时候,需要对hash表进行扩容,这时候需要转换item的锁从分段锁转化成为全局锁,由于hash扩容操作是一个单独的线程在做,改变锁的类型需要改变所有worker对item的锁的类型,这时候memcached是通过循环每个worker对他们的pipe的一端写入锁转换的命令,在每个worker其实都有两套libevent
loop,一套用于监听其他线程发给work的信息,例如主线程去监听网络,如果有连接来了,主线程通过这个pipe通知worker,还有就是这个锁的类型的变化,通过pipe通知,另一套loop就是去通过那个连接的fd去轮询网络中的读事件等。
174 void switch_item_lock_type(enum item_lock_types type) {
175 char buf[1];//通过pipe发给worker的char
buf
176 int i;
177
178 switch (type) {
179 case ITEM_LOCK_GRANULAR:
180 buf[0] = 'l';//转换为分段锁
181 break;
182 case ITEM_LOCK_GLOBAL:
183 buf[0] = 'g';//转换为全局锁
184 break;
185 default:
186 fprintf(stderr, "Unknown lock type: %d\n", type);
187 assert(1 == 0);
188 break;
189 }
190
191 pthread_mutex_lock(&init_lock);
192 init_count = 0;
193 for (i = 0; i < settings.num_threads; i++) {
194 if (write(threads[i].notify_send_fd, buf, 1) != 1) {
//向每个worker的notify_send_fd (pipe的一端)写入buf
195 perror("Failed writing to notify pipe");
196 /* TODO: This is a
fatal problem. Can it ever happen temporarily? */
197 }
198 }
199 wait_for_thread_registration(settings.num_threads);//等待每个线程处理完设置完锁的类型
200 pthread_mutex_unlock(&init_lock);
201 }
在thread.c中锁的类型转换:
427 case 'l':
428 me->item_lock_type = ITEM_LOCK_GRANULAR;
429 register_thread_initialized();
430 break;
431 case 'g':
432 me->item_lock_type = ITEM_LOCK_GLOBAL;
433 register_thread_initialized();
434 break;
435 }
通过连接每个worker的管道就设置了锁的类型。
在hash扩容时会加全局锁
204 static void *assoc_maintenance_thread(void *arg) {
205
206 while (do_run_maintenance_thread) {
207 int ii = 0;
208
209 /* Lock the cache, and bulk
move multiple buckets to the new
210 * hash table. */
211 item_lock_global();//加的是全局锁
212 mutex_lock(&cache_lock);
在没有开始扩容时会把锁粒度调整到分段锁,同时打开slab_move,详见前一篇blog,扩容会使线程停留在等待 maintenance_cond上,在hash容量操作了桶的1.5倍后slab_move被停止,同时锁的粒度转换为全局锁,开始hash扩容
244 if (!expanding) {
245 /* finished expanding. tell all threads to use
fine-grained locks */
246 switch_item_lock_type(ITEM_LOCK_GRANULAR);//默认就是分段锁
247 slabs_rebalancer_resume(); //打开slab_move
248 /* We are done expanding.. just
wait for next invocation */
249 mutex_lock(&cache_lock);
250 started_expanding = false;
251 pthread_cond_wait(&maintenance_cond, &cache_lock);//在不扩容时,线程会停留在这里
252 /* Before doing anything, tell threads to use
a global lock */
253 mutex_unlock(&cache_lock); //后面开始扩容准备
254 slabs_rebalancer_pause();//停止slab_move
255 switch_item_lock_type(ITEM_LOCK_GLOBAL);//转换为全局锁
256 mutex_lock(&cache_lock);
257 assoc_expand();//开始扩容
258 mutex_unlock(&cache_lock);
259 }
在对item进行操作时都会加上调用 item_lock,参数时桶的hash值,会根据锁的类型来决定是否阻塞对item的操作。
例如插入item操作:
517 int item_link(item *item) {
518 int ret;
519 uint32_t hv;
520
521 hv = hash(ITEM_key(item), item->nkey, 0);//取得hash值
522 item_lock(hv); //根据锁的类型加锁
523 ret = do_item_link(item, hv);//插入操作
524 item_unlock(hv);//释放锁
525 return ret;
526 }
再例如删除操作(释放item的slab中的空间):
532 void item_remove(item *item) {
533 uint32_t hv;
534 hv = hash(ITEM_key(item), item->nkey, 0);//取得hash值
535
536 item_lock(hv);//根据锁的类型加锁
537 do_item_remove(item);//删除操作
538 item_unlock(hv);//释放锁
539 }
在thread.c中可以看到依靠这种锁的操作有item_get,item_touch,item_unlink,item_update,add_delta,store_item等
memcached中锁粒度的转换大大提高了它的并发性,对于每个item的操作,还有refcount变量来表示一个item的状态,在多线程中如何不用对每个item加锁,而通过refcount来控制item的操作,这个在研究中,下次blog再写。
具体的解决办法是:memcached在扩容操作时,加的都是全局锁,就是所有item(所有hash桶中的)都是一把锁,在扩容操作中,item的操作,例如hash表的删除,插入,touch,查询都是去竞争那个全局锁,因为原来的元素在old_table中的元素需要rehash到primary_table,虽然可以在old_table中的每个桶上加锁,但是没法控制primary_table的多进程操作,小于expand_bucket的元素会直接进入primary_table,old_table的元素会按照新的hash值进入到primary中,不能确定rehash到primary_table的哪个桶中,所以这时侯只能获取全局锁。在扩容结束时,item锁被重新切换回hash桶上的锁,这里锁是分段加锁的(几个桶一个锁,这个具体数值取决与初始的worker的数量,worker数量越多,锁越细,越少hash桶公用一个锁)。默认hash桶是的1<<16个hash桶
assoc.c
62 void assoc_init(const int hashtable_init) {
63 if (hashtable_init) {
//初始为空
64 hashpower = hashtable_init;
65 }
66 primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
这里默认的hashtable_init为空,这里的hashpower=HASHPOWER_DEFAULT,等于16
thread.c
786 if (nthreads < 3) {
787 power = 10;
788 } else if (nthreads < 4) {
789 power = 11;
790 } else if (nthreads < 5) {
791 power = 12;
792 } else {
793 /* 8192 buckets, and central
locks don't scale much past 5 threads */
794 power = 13;
795 }
796
797 item_lock_count = hashsize(power);
798
799 item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
可以看出在小于3个worker的时候,在默认hash桶的数量下,相隔64个桶一个锁,在4的时候相隔32个桶一把锁,到了6个以上的worker的时候默认情况下相隔8个桶一个锁。
在每次操作锁的时候会判断下锁的类型,如果全局锁,就对全局加锁,如果是细粒度()的锁,就通过hash值去取得hash桶的锁
124 void item_lock(uint32_t hv) {
125 uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
126 if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {//细粒度的锁
127 mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);
//取得hash值在去取得相应的锁
128 } else {
129 mutex_lock(&item_global_lock);//全局锁
130 }
131 }
每个线程的锁的类型存在了每个线程的私有空间中,用函数 pthread_setspecific和 pthread_getspecific取得,每个线程默认锁的类型是分段锁(hash桶锁)
368 static void *worker_libevent(void *arg) {
369 LIBEVENT_THREAD *me = arg;
370
371 /* Any per-thread setup can happen here; thread_init() will
block until
372 * all threads have finished initializing.
373 */
374
375 /* set an indexable thread-specific
memory item for the lock type.
376 * this could be unnecessary if we pass the conn *c struct through
377 * all item_lock calls...
378 */
379 me->item_lock_type = ITEM_LOCK_GRANULAR;
380 pthread_setspecific(item_lock_type_key, &me->item_lock_type);
381
382 register_thread_initialized();
383
384 event_base_loop(me->base, 0);
385 return NULL;
386 }
如果在hash空间不够的时候,需要对hash表进行扩容,这时候需要转换item的锁从分段锁转化成为全局锁,由于hash扩容操作是一个单独的线程在做,改变锁的类型需要改变所有worker对item的锁的类型,这时候memcached是通过循环每个worker对他们的pipe的一端写入锁转换的命令,在每个worker其实都有两套libevent
loop,一套用于监听其他线程发给work的信息,例如主线程去监听网络,如果有连接来了,主线程通过这个pipe通知worker,还有就是这个锁的类型的变化,通过pipe通知,另一套loop就是去通过那个连接的fd去轮询网络中的读事件等。
174 void switch_item_lock_type(enum item_lock_types type) {
175 char buf[1];//通过pipe发给worker的char
buf
176 int i;
177
178 switch (type) {
179 case ITEM_LOCK_GRANULAR:
180 buf[0] = 'l';//转换为分段锁
181 break;
182 case ITEM_LOCK_GLOBAL:
183 buf[0] = 'g';//转换为全局锁
184 break;
185 default:
186 fprintf(stderr, "Unknown lock type: %d\n", type);
187 assert(1 == 0);
188 break;
189 }
190
191 pthread_mutex_lock(&init_lock);
192 init_count = 0;
193 for (i = 0; i < settings.num_threads; i++) {
194 if (write(threads[i].notify_send_fd, buf, 1) != 1) {
//向每个worker的notify_send_fd (pipe的一端)写入buf
195 perror("Failed writing to notify pipe");
196 /* TODO: This is a
fatal problem. Can it ever happen temporarily? */
197 }
198 }
199 wait_for_thread_registration(settings.num_threads);//等待每个线程处理完设置完锁的类型
200 pthread_mutex_unlock(&init_lock);
201 }
在thread.c中锁的类型转换:
427 case 'l':
428 me->item_lock_type = ITEM_LOCK_GRANULAR;
429 register_thread_initialized();
430 break;
431 case 'g':
432 me->item_lock_type = ITEM_LOCK_GLOBAL;
433 register_thread_initialized();
434 break;
435 }
通过连接每个worker的管道就设置了锁的类型。
在hash扩容时会加全局锁
204 static void *assoc_maintenance_thread(void *arg) {
205
206 while (do_run_maintenance_thread) {
207 int ii = 0;
208
209 /* Lock the cache, and bulk
move multiple buckets to the new
210 * hash table. */
211 item_lock_global();//加的是全局锁
212 mutex_lock(&cache_lock);
在没有开始扩容时会把锁粒度调整到分段锁,同时打开slab_move,详见前一篇blog,扩容会使线程停留在等待 maintenance_cond上,在hash容量操作了桶的1.5倍后slab_move被停止,同时锁的粒度转换为全局锁,开始hash扩容
244 if (!expanding) {
245 /* finished expanding. tell all threads to use
fine-grained locks */
246 switch_item_lock_type(ITEM_LOCK_GRANULAR);//默认就是分段锁
247 slabs_rebalancer_resume(); //打开slab_move
248 /* We are done expanding.. just
wait for next invocation */
249 mutex_lock(&cache_lock);
250 started_expanding = false;
251 pthread_cond_wait(&maintenance_cond, &cache_lock);//在不扩容时,线程会停留在这里
252 /* Before doing anything, tell threads to use
a global lock */
253 mutex_unlock(&cache_lock); //后面开始扩容准备
254 slabs_rebalancer_pause();//停止slab_move
255 switch_item_lock_type(ITEM_LOCK_GLOBAL);//转换为全局锁
256 mutex_lock(&cache_lock);
257 assoc_expand();//开始扩容
258 mutex_unlock(&cache_lock);
259 }
在对item进行操作时都会加上调用 item_lock,参数时桶的hash值,会根据锁的类型来决定是否阻塞对item的操作。
例如插入item操作:
517 int item_link(item *item) {
518 int ret;
519 uint32_t hv;
520
521 hv = hash(ITEM_key(item), item->nkey, 0);//取得hash值
522 item_lock(hv); //根据锁的类型加锁
523 ret = do_item_link(item, hv);//插入操作
524 item_unlock(hv);//释放锁
525 return ret;
526 }
再例如删除操作(释放item的slab中的空间):
532 void item_remove(item *item) {
533 uint32_t hv;
534 hv = hash(ITEM_key(item), item->nkey, 0);//取得hash值
535
536 item_lock(hv);//根据锁的类型加锁
537 do_item_remove(item);//删除操作
538 item_unlock(hv);//释放锁
539 }
在thread.c中可以看到依靠这种锁的操作有item_get,item_touch,item_unlink,item_update,add_delta,store_item等
memcached中锁粒度的转换大大提高了它的并发性,对于每个item的操作,还有refcount变量来表示一个item的状态,在多线程中如何不用对每个item加锁,而通过refcount来控制item的操作,这个在研究中,下次blog再写。
相关文章推荐
- Nginx+Tomcat+Memcached集群Session共享
- Memcached(简单的,纯内存的)
- Memcached集群/分布式/高可用 及 Magent缓存代理搭建过程 详解
- 使用memcache存储数据
- Memcached的配置,SSH项目中的整合(com.whalin),Memcached工具类,Memcached的代码调用
- Memcached的配置,SSH项目中的整合(com.whalin),Memcached工具类,Memcached的代码调用
- memcached源码分析之四
- memcached真实项目中的应用
- memcached基本配置与使用
- Redis,Memcache,mongoDB的区别
- Redis,Memcache,mongoDB的区别
- 那点所谓的分布式——memcache
- memcache分布式部署的原理分析
- memcache 分布式,算法实现
- Mac Yosemite安装配置nginx+php+mysql+memcached环境
- If-Memcached集成手册
- AWS中使用Memcached作为hibernate的二级缓存
- 【ITOO】--memcache小结
- Memcached、Redis、RDD(Spark)的数据处理性能对比(Efficient in-memory data management: an analysis论文翻译)
- memcached的失效时间设置注意点