Memcached线程模型分析
2015-08-30 22:30
477 查看
前面提到过Memcached的线程模型:Memcached使用Libevent,以此为基础来处理事件。其原理为:启动时的线程为main thread,它包含一个event_base,之后创建多个worker thread;每个work thread中也有一个event_base。main thread中的event_base负责监听网络,接收新连接;当建立连接后就把新连接交给worker thread来处理。
Memcached的main函数在Memcached.c中。main函数中初始化了系统设置,初始化多线程:
1、初始化了main thread中的event_base、初始化多线程、初始化网络
2、接着main thread初始化worker thread。每个worker thread都对应一个结构体
了解了这个结构体,再来看一下main thread和worker thread的初始化过程。
在上面的代码中,
再来看线程的启动函数func(worker_libevent)
到了这里多线程模型已经基本创建完成了。
3、main thread创建listen socket并监听。
在conn_new设置监听连接。这里用到了一个连接的结构体
接下来看
4、再来看一下多线程模型中的事件处理,即main thread和worker thread如何协同工作处理事件。main thread的事件处理函数为
先来看main thread的事件处理函数
至此,main thread已经将连接交给了worker thread。接下来看worker thread怎么处理
至此,Memcached中,线程如何创建、关联event_base、设置event_handle、main thread和worker thread如何通信等问题已经基本理清,细节暂不考虑。
Memcached的main函数在Memcached.c中。main函数中初始化了系统设置,初始化多线程:
1、初始化了main thread中的event_base、初始化多线程、初始化网络
/* initialize main thread libevent instance */ main_base = event_init();//初始化main event_base …… /* start up worker threads if MT mode */ memcached_thread_init(settings.num_threads, main_base);//初始化工作线程 …… //初始化socket并监听。这里假设使用的是TCP if (settings.port && server_sockets(settings.port, tcp_transport, portnumber_file)) { vperror("failed to listen on TCP port %d", settings.port); exit(EX_OSERR); }
2、接着main thread初始化worker thread。每个worker thread都对应一个结构体
LIBEVENT_THREAD,包含其相关信息
//定义在Memcached.h中 typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ struct event notify_event; /* listen event for notify pipe */ int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ } LIBEVENT_THREAD;
了解了这个结构体,再来看一下main thread和worker thread的初始化过程。
memcached_thread_init(settings.num_threads, main_base);//初始化工作线程 //函数memcached_thread_init定义在Thread.c中 void memcached_thread_init(int nthreads, struct event_base *main_base) { …… …… //LIBEVENT_THREAD包含了worker thread的相关信息,给worker thread的标识信息分配内存。 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { perror("Can't allocate thread descriptors"); exit(1); } dispatcher_thread.base = main_base;//main thread的event_base dispatcher_thread.thread_id = pthread_self();//main thread的ID //为了能让main thread和workerthread通信,创建管道。worker thread监听管道的可读,当main thread需要 //唤醒worker thread时,向管道写信息即可。 for (i = 0; i < nthreads; i++) { int fds[2]; if (pipe(fds)) { perror("Can't create notify pipe"); exit(1); } //把创建的管道和每个worker thread关联 threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; setup_thread(&threads[i]);//初始化每个worker thread /* Reserve three fds for the libevent base, and two for the pipe */ stats.reserved_fds += 5; } /* Create threads after we've done all the libevent setup. */ //创建并启动worker thread。worker_libevent是worker线程的启动函数 for (i = 0; i < nthreads; i++) { create_worker(worker_libevent, &threads[i]); } /* Wait for all the threads to set themselves up before returning. */ //等待所有worker thread都启动 pthread_mutex_lock(&init_lock); //每个线程启动后都会notify信号量,这里等待nthreads次notify。 wait_for_thread_registration(nthreads); pthread_mutex_unlock(&init_lock); }
在上面的代码中,
setup_thread设置worker thread的event相关信息;
create_worker是创建并启动线程。
//setup_thread函数 static void setup_thread(LIBEVENT_THREAD *me) { me->base = event_init();//worker thread的event_base /* Listen for notifications from other threads */ //监听创建的管道事件,事件处理函数为thread_libevent_process event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event);//将事件和其event_base关联 if (event_add(&me->notify_event, 0) == -1) {//将事件添加到event_base fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } me->new_conn_queue = malloc(sizeof(struct conn_queue));//为worker thread创建监听事件队列 cq_init(me->new_conn_queue);//初始化这个队列 …… } //create_worker函数 static void create_worker(void *(*func)(void *), void *arg) { …… //创建线程,func为线程启动函数。这里func为worker_libevent,arg为threads[i] if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) { fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); exit(1); } }
再来看线程的启动函数func(worker_libevent)
static void *worker_libevent(void *arg) { LIBEVENT_THREAD *me = arg; /* Any per-thread setup can happen here; memcached_thread_init() will block until * all threads have finished initializing. */ register_thread_initialized();//这里向main thread发起信号量的notify。main thread在wait_for_thread_registration等待每个线程都启动 event_base_loop(me->base, 0); return NULL; }
到了这里多线程模型已经基本创建完成了。
3、main thread创建listen socket并监听。
//初始化socket并监听 main() { …… if (settings.port && server_sockets(settings.port, tcp_transport, portnumber_file)) { vperror("failed to listen on TCP port %d", settings.port); exit(EX_OSERR); } …… } //函数server_sockets。其实内部又调用了一个函数server_socket,看这个函数 static int server_socket(const char *interface, int port, enum network_transport transport, FILE *portnumber_file) { int sfd; hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;//判断是TCP还是UDP for (next= ai; next; next= next->ai_next) { conn *listen_conn_add; if ((sfd = new_socket(next)) == -1) {//创建socket fd continue; } setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));//设置sfd属性 if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {//绑定地址 close(sfd); continue; } else { success++; if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) { perror("listen()"); close(sfd); freeaddrinfo(ai); return 1; } } if (IS_UDP(transport)) { …… } else { if (!(listen_conn_add = conn_new(sfd, conn_listening,//在conn_new中监听连接 EV_READ | EV_PERSIST, 1, transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } listen_conn_add->next = listen_conn; listen_conn = listen_conn_add; } } freeaddrinfo(ai); /* Return zero iff we detected no errors in starting up connections */ return success == 0; }
在conn_new设置监听连接。这里用到了一个连接的结构体
conn,字段比较多,那几个字段看一下
//在Memcached.h中 typedef struct conn conn; struct conn { int sfd;//连接的fd sasl_conn_t *sasl_conn; bool authenticated; enum conn_states state;//连接状态 enum bin_substates substate; rel_time_t last_cmd_time; struct event event;//连接时间 short ev_flags; …… conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */ };
接下来看
conn_new函数
conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base) { conn *c; assert(sfd >= 0 && sfd < max_fds); c = conns[sfd]; …… …… if (transport == tcp_transport && init_state == conn_new_cmd) {//如果是新连接,则初始化 if (getpeername(sfd, (struct sockaddr *) &c->request_addr, &c->request_addr_size)) { perror("getpeername"); memset(&c->request_addr, 0, sizeof(c->request_addr)); } } …… event_set(&c->event, sfd, event_flags, event_handler, (void *)c);//创建监听的event,事件处理函数为event_handler event_base_set(base, &c->event);//将event和event_base相关联 c->ev_flags = event_flags; if (event_add(&c->event, 0) == -1) {//添加event事件 perror("event_add"); return NULL; } …… return c; }
4、再来看一下多线程模型中的事件处理,即main thread和worker thread如何协同工作处理事件。main thread的事件处理函数为
event_handler,worker thread的事件处理函数为
thread_libevent_process。
先来看main thread的事件处理函数
event_handler
void event_handler(const int fd, const short which, void *arg) { conn *c; c = (conn *)arg;//arg是新建的连接 assert(c != NULL); c->which = which; …… drive_machine(c);//这里处理连接 /* wait for next event */ return; } //函数drive_machine static void drive_machine(conn *c) { while (!stop) { switch(c->state) {//根据连接状态来处理 case conn_listening: sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);//接收连接 if () { …… } else { dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport); } stop = true; break; …… } } return; } //函数dispatch_conn_new。在Thread.c中 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new();//创建结构体CQ_ITEM,用来保存连接信息 char buf[1]; if (item == NULL) { close(sfd); /* given that malloc failed this may also fail, but let's try */ fprintf(stderr, "Failed to allocate memory for connection object\n"); return ; } int tid = (last_thread + 1) % settings.num_threads;//使用Round Robin分配worker thread LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; //给结构体CQ_ITEM赋值 item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; cq_push(thread->new_conn_queue, item);//将item添加到worker thread MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); buf[0] = 'c'; if (write(thread->notify_send_fd, buf, 1) != 1) {//通知worker thread perror("Writing to thread notify pipe"); } }
至此,main thread已经将连接交给了worker thread。接下来看worker thread怎么处理
static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; if (read(fd, buf, 1) != 1) if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); switch (buf[0]) { case 'c'://main thread给worker thread插入item后,发送了'c' item = cq_pop(me->new_conn_queue);//取出这个Item if (NULL != item) { //设置这个连接的event事件,并添加到event_base conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { if (IS_UDP(item->transport)) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { if (settings.verbose > 0) { fprintf(stderr, "Can't listen for events on fd %d\n", item->sfd); } close(item->sfd); } } else { c->thread = me; } cqi_free(item); } break; /* we were told to pause and report in */ case 'p': register_thread_initialized(); break; } }
至此,Memcached中,线程如何创建、关联event_base、设置event_handle、main thread和worker thread如何通信等问题已经基本理清,细节暂不考虑。
相关文章推荐
- Memcached 笔记与总结(1)Linux(CentOS 6.6) 和 Windows(7)下安装与配置 Memcached (1.4.24)与 Memcached 基础命令
- 安装和使用memcached
- Memcached概述
- memcache使用简记
- memcached缓存集群搭建
- memcached服务器设为开机自启动
- memcached完全剖析–1. memcached的基础
- Memcached介绍和安装
- Redis和Memcache的区别总结
- ehcache memcache redis 三大缓存男高音
- GPS部标监控平台的架构设计(十一)-基于Memcached的分布式Gps监控平台
- memcached 常见问题 翻译
- 如何将PHP session信息缓存到memcached里面
- linux安装memcache过程
- cmem、redis、memcache的简单比较
- memcached协议
- tomcat memcached session 共享实现
- linux memcache安装
- memcached并发处理
- Memcached之缓存雪崩,缓存穿透,缓存预热,缓存算法(7)