memcached(二)事件模型源码分析
2015-02-09 14:52
169 查看
memcachedd事件模型
在memcachedd中,作者为了专注于缓存的设计,使用了libevent来开发事件模型。memcachedd的时间模型同nginx的类似,拥有一个主进行(master)以及多个工作者线程(woker)。
流程图
在memcached中,是先对工作者线程进行初始化并启动,然后才会创建启动主线程。/** * Create a socket and bind it to a specific port number * @param interface the interface to bind to * @param port the port number to bind to * @param transport the transport protocol (TCP / UDP) * @param portnumber_file A filepointer to write the port numbers to * when they are successfully added to the list of ports we * listen on. */ static int server_socket(const char *interface, int port, enum network_transport transport, FILE *portnumber_file) { int sfd; struct linger ling = {0, 0}; struct addrinfo *ai; struct addrinfo *next; struct addrinfo hints = { .ai_flags = AI_PASSIVE, .ai_family = AF_UNSPEC }; char port_buf[NI_MAXSERV]; int error; int success = 0; int flags =1; hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM; if (port == -1) { port = 0; } snprintf(port_buf, sizeof(port_buf), "%d", port); error= getaddrinfo(interface, port_buf, &hints, &ai); if (error != 0) { if (error != EAI_SYSTEM) fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error)); else perror("getaddrinfo()"); return 1; } for (next= ai; next; next= next->ai_next) { conn *listen_conn_add; if ((sfd = new_socket(next)) == -1) { /* getaddrinfo can return "junk" addresses, * we make sure at least one works before erroring. */ if (errno == EMFILE) { /* ...unless we're out of fds */ perror("server_socket"); exit(EX_OSERR); } continue; } #ifdef IPV6_V6ONLY if (next->ai_family == AF_INET6) { error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags)); if (error != 0) { perror("setsockopt"); close(sfd); continue; } } #endif setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); if (IS_UDP(transport)) { maximize_sndbuf(sfd); } else { error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); if (error != 0) perror("setsockopt"); error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); if (error != 0) perror("setsockopt"); error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); if (error != 0) perror("setsockopt"); } if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) { if (errno != EADDRINUSE) { perror("bind()"); close(sfd); freeaddrinfo(ai); return 1; } close(sfd); continue; } else { success++; if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) { perror("listen()"); close(sfd); freeaddrinfo(ai); return 1; } if (portnumber_file != NULL && (next->ai_addr->sa_family == AF_INET || next->ai_addr->sa_family == AF_INET6)) { union { struct sockaddr_in in; struct sockaddr_in6 in6; } my_sockaddr; socklen_t len = sizeof(my_sockaddr); if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) { if (next->ai_addr->sa_family == AF_INET) { fprintf(portnumber_file, "%s INET: %u\n", IS_UDP(transport) ? "UDP" : "TCP", ntohs(my_sockaddr.in.sin_port)); } else { fprintf(portnumber_file, "%s INET6: %u\n", IS_UDP(transport) ? "UDP" : "TCP", ntohs(my_sockaddr.in6.sin6_port)); } } } } if (IS_UDP(transport)) { int c; for (c = 0; c < settings.num_threads_per_udp; c++) { /* Allocate one UDP file descriptor per worker thread; * this allows "stats conns" to separately list multiple * parallel UDP requests in progress. * * The dispatch code round-robins new connection requests * among threads, so this is guaranteed to assign one * FD to each thread. */ int per_thread_fd = c ? dup(sfd) : sfd; dispatch_conn_new(per_thread_fd, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, transport); } } else { if (!(listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } listen_conn_add->next = listen_conn; listen_conn = listen_conn_add; } } freeaddrinfo(ai); /* Return zero iff we detected no errors in starting up connections */ return success == 0; }
server_socket源码
主线程事件
在主线程中通过`conn_new`函数来建立主线程和工作者线程之间的关系。/* 设置线程事件 */ event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event); c->ev_flags = event_flags; /* 注册事件到监听 */ if (event_add(&c->event, 0) == -1) { perror("event_add"); return NULL; }
事件处理
上面中设置了事件的回调函数`event_handler`,而在`event_handler`中,主要调用了`driver_machine`函数。driver_machine看名字就知道,想发动机一样的函数,那么该函数主要是处理各种事件以及相应的处理方法。
这里只简要介绍一个函数调用`dispatch_conn_new`。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new(); char buf[1]; if (item == NULL) { close(sfd); /* given that malloc failed this may also fail, but let's try */ fprintf(stderr, "Failed to allocate memory for connection object\n"); return ; } int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread = threads + tid; //循环获取工作者线程 last_thread = tid; item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; cq_push(thread->new_conn_queue, item); //连接加入懂啊队列 memcachedD_CONN_DISPATCH(sfd, thread->thread_id); buf[0] = 'c'; if (write(thread->notify_send_fd, buf, 1) != 1) {//向管道写入消息 perror("Writing to thread notify pipe"); } }
相关文章推荐
- memcached源码分析之线程模型
- Memcached源码分析(线程模型)
- Memcached 源码分析--网络模型流程分析
- nginx源码分析(4)——事件模型
- Memcached源码分析之网络模型篇
- Memcached源码分析之线程模型
- HBase源码分析之事件处理模型
- memcached源码分析之线程模型 【转】
- Memcached源码分析之线程模型
- Memcached源码分析(线程模型)
- memcached I/O模型源码分析
- Memcached源码分析--线程模型(二)
- (转载)Memcached源码分析(线程模型)
- memcached源码分析-----网络模型
- Memcached源码分析(线程模型)
- 非常适合新手的jq/zepto源码分析06 -- 事件模型
- Memcached源码分析之基于Libevent的网络模型
- Memcached源码分析(线程模型)
- Netty5源码分析(四) -- 事件分发模型
- Memcached源码分析(线程模型)