Memcached源码拆分:Libevent_Thread
2011-11-04 17:01
316 查看
个人笔记,不做详细介绍
最近看完Memcached的源码后总是忍不住再看一遍,虽然有些地方写得有点不是那么华丽,但是总感觉想从Memcached里面挖出一些东西来,所以就再看了其他人的关于Memached的源码的一些分析,加上自己对Memcached的理解,目前觉得Memcached的源码里有下面几部分可以好好学学:
(1)SLAB内存分配策略
(2)线程模型
(3)Memcached通信协议
(4)Consistent Hashing
(5)LRU策略
(6)UNIX domain,UDP,TCP客户端连接
(7)Libevent
(8)Memcached的分布式
今天先从线程模型开始吧,简单说说然后直接上代码(原谅我现在越来越懒了)。
主要过程:
1 主进程server_socket负责接受外部发起的连接,并且将这个建立后的连接交给workerThread处理,选择workerThread的方式是轮询。
2 workerThread负责“接受外部连接”,并且将已经建立的连接封装成conn对象,然后委托给一个event_handler函数做后续处理。
3 主线程和workerThread之间的通信是通过pipe来完成,workerThread在rev_fd上监听事件,而主线程则向fd写入一个字节表示新连接到来,同时也push一个item到全局的queue中。
4 workerThread监听到这个事件后就从queue中拿出item封装成conn对象,其中就包含了外部连接的一些属性,这时候workerThread可以独立跟外部连接通信。
我把Memcached中关于线程模型的代码拆剪了出来,变成一个简单可执行的程序。主要包括以下几个文件
1 main.cpp
初始化WorkerThread(实际上每个WorkerThread负责一个外部连接),启动主线程server connect事件监听(这里我不用socket通信,直接监听命令行),一旦命令行有命令输入,就会激发事件,然后主线程就会以轮询的方式从空闲线程中选出一条来进行后续操作,并通过pipe通知该workerThread有新item可以处理。
conn.cpp
//#include<pthread.h> //#include<event.h> #include"conn.h" #include<unistd.h> static pthread_mutex_t conn_lock=PTHREAD_MUTEX_INITIALIZER; static int freecurr; static int freetotal; static conn **freeconns; //static //why can not use static void conn_init(void) { //每个Worker的 freetotal = 200; freecurr = 0; if ((freeconns =(conn**)calloc(freetotal, sizeof(conn *))) == NULL) { fprintf(stderr, "Failed to allocate connection structures\n"); } return; } conn *conn_new(const int sfd,enum conn_states init_state,const int event_flags, const int read_buffer_size,enum network_transport transport, struct event_base *base) { conn *c=conn_from_freelist(); if(NULL==c) { if(!(c=(conn*)calloc(1,sizeof(conn)))) { fprintf(stderr,"calloc()\n"); return NULL; } //initiate c /* { MEMCACHED_CONN_CREATE(c); c->rbuf = c->wbuf = 0; c->ilist = 0; c->suffixlist = 0; c->iov = 0; c->msglist = 0; c->hdrbuf = 0; c->rsize = read_buffer_size; c->wsize = DATA_BUFFER_SIZE; c->isize = ITEM_LIST_INITIAL; c->suffixsize = SUFFIX_LIST_INITIAL; c->iovsize = IOV_LIST_INITIAL; c->msgsize = MSG_LIST_INITIAL; c->hdrsize = 0; c->rbuf = (char *)malloc((size_t)c->rsize); c->wbuf = (char *)malloc((size_t)c->wsize); c->ilist = (item **)malloc(sizeof(item *) * c->isize); c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize); c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize); c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize); //根据配置大小来分配,可能会分配失败 if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 || c->msglist == 0 || c->suffixlist == 0) { conn_free(c); fprintf(stderr, "malloc()\n"); return NULL; } //end initiate c } */ } //initiate c /* { c->transport = transport; //传输方式 c->protocol = settings.binding_protocol; //传输协议 // unix socket mode doesn't need this, so zeroed out. but why // is this done for every command? presumably for UDP // mode. if (!settings.socketpath) { c->request_addr_size = sizeof(c->request_addr); } else { c->request_addr_size = 0; } if (settings.verbose > 1) { if (init_state == conn_listening) { fprintf(stderr, "<%d server listening (%s)\n", sfd, prot_text(c->protocol)); } else if (IS_UDP(transport)) { fprintf(stderr, "<%d server listening (udp)\n", sfd); } else if (c->protocol == negotiating_prot) { fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd); } else if (c->protocol == ascii_prot) { fprintf(stderr, "<%d new ascii client connection.\n", sfd); } else if (c->protocol == binary_prot) { fprintf(stderr, "<%d new binary client connection.\n", sfd); } else { fprintf(stderr, "<%d new unknown (%d) client connection\n", sfd, c->protocol); assert(false); } } //信息赋值给conn结构,conn的构造把item全用上了 c->sfd = sfd; c->state = init_state; c->rlbytes = 0; c->cmd = -1; c->rbytes = c->wbytes = 0; c->wcurr = c->wbuf; c->rcurr = c->rbuf; c->ritem = 0; c->icurr = c->ilist; c->suffixcurr = c->suffixlist; c->ileft = 0; c->suffixleft = 0; c->iovused = 0; c->msgcurr = 0; c->msgused = 0; c->write_and_go = init_state; c->write_and_free = 0; c->item = 0; c->noreply = false; } */ //end initiate c c->sfd=sfd; printf("test fd is:%d\n",sfd); event_set(&c->event,sfd,event_flags,event_handler,(void*)c); event_base_set(base,&c->event); c->ev_flags=event_flags; if(event_add(&c->event,0)==-1) { if( conn_add_to_freelist(c)) conn_free(c); perror("event_add"); return NULL; } //MEMCACHED_CONN_ALLOCATE(c->sfd); return c; } conn *conn_from_freelist() { conn *c; pthread_mutex_lock(&conn_lock); if(freecurr>0) c=freeconns[--freecurr]; else c=NULL; pthread_mutex_unlock(&conn_lock); } void event_handler(const int fd,const short which,void *arg) { //real handler char buf[128]; int rc=read(fd,buf,sizeof(buf)); buf[rc]='\0'; printf("%s\n",buf); } bool conn_add_to_freelist(conn *c) { bool ret = true; pthread_mutex_lock(&conn_lock); if (freecurr < freetotal) { freeconns[freecurr++] = c; ret = false; } else { // try to enlarge free connections array size_t newsize = freetotal * 2; conn **new_freeconns = (conn**)realloc(freeconns, sizeof(conn *) * newsize); if (new_freeconns) { freetotal = newsize; freeconns = new_freeconns; freeconns[freecurr++] = c; ret = false; } } pthread_mutex_unlock(&conn_lock); return ret; } void conn_free(conn *c) { if (c) { /* MEMCACHED_CONN_DESTROY(c); if (c->hdrbuf) free(c->hdrbuf); if (c->msglist) free(c->msglist); if (c->rbuf) free(c->rbuf); if (c->wbuf) free(c->wbuf); if (c->ilist) free(c->ilist); if (c->suffixlist) free(c->suffixlist); if (c->iov) free(c->iov); */ free(c); } }
源码:memcached的线程模型.rar
相关文章推荐
- Memcached源码拆分:Libevent_Thread
- memcached源码阅读----使用libevent和多线程模型
- Memcached源码分析之基于Libevent的网络模型
- Libevent源码分析-----多线程、锁、条件变量(一)如evthread_use_windows_threads
- Libevent源码分析-----evthread_notify_base通知主线程
- Linux c 开发 - Memcached源码分析之基于Libevent的网络模型(1)
- Memcached源码拆分:Slabs
- libevent 和 memcached 源码学习
- Libevent源码分析-----evthread_notify_base通知主线程
- memcached源码阅读----使用libevent和多线程模型
- Libevent源码分析-----evthread_notify_base通知主线程
- 高性能,分布式,轻量级缓存组件memcached的源码剖析---第二篇,libevent
- Libevent源码分析-----evthread_notify_base通知主线程
- Memcached源码分析之thread.c
- Memcached内存管理源码阅读
- HandlerThread源码解析
- rt-thread装载共享目标文件的过程源码分析
- libevent源码分析
- libevent源码深度剖析 一 1
- memcached源码分析-----item过期失效处理以及LRU爬虫