您的位置:首页 > 理论基础 > 计算机网络

Memcached源码解析--多线程网络模型

2015-09-24 16:00 519 查看
摘自http://blog.chinaunix.net/uid-22312037-id-3563755.html

一、网络模型

Memcache网络底层采用的Libevent,其网络模型就是大名鼎鼎的半同步半异步。关于Libevent可以参看我之前写的一篇拙文《Libevent源码解析--事件处理框架》,半同步半异步可以看这篇有名的博文

在阅读本文前,建议先阅读《Libevent源码解析--事件处理框架》,这样很多概念便于理解,很多组件或术语在这儿不再赘述。

二、多线程模型

1、关键结构

老规矩

,先介绍下关键的数据结构:

(1)CQ_ITEM

CQ_ITEM定义在Thread.c/21行,具体如下:

typedef struct conn_queue_item CQ_ITEM;

struct conn_queue_item {

int sfd;

enum conn_states init_state;

int event_flags;

int read_buffer_size;

enum network_transport transport;

CQ_ITEM *next;

};

可以将这个结构体看着是主线程accept触发时即有客户端连入时,主线程写入工作线程有关socket连接相关句柄数据结构,绑定了socket描述符、状态、发生的事件、读buffer大小等,看着解释应该不难对号入座吧



(2)CQ

conn_queue定义在Thread.c/33行,具体如下:

typedef struct conn_queue CQ;

struct conn_queue {

CQ_ITEM *head;

CQ_ITEM *tail;

pthread_mutex_t lock;

pthread_cond_t cond;

};

这是socket连接通知队列,这句都看不明白的打PP



(3)LIBEVENT_THREAD

LIBEVENT_THREAD定义在Memcached.h/351行,具体如下:

typedef struct {

pthread_t thread_id; /* unique ID of this thread */

struct event_base *base; /* libevent handle this thread uses */

struct event notify_event; /* listen event for notify pipe */

int notify_receive_fd; /* receiving end of notify pipe */

int notify_send_fd; /* sending end of notify pipe */

struct thread_stats stats; /* Stats generated by this thread */

struct conn_queue *new_conn_queue; /* queue of new connections to handle */

cache_t *suffix_cache; /* suffix cache */

uint8_t item_lock_type; /* use fine-grained or global item lock */

} LIBEVENT_THREAD;

可以将这个结构体看着是线程句柄数据结构,绑定了线程ID、Libevent实例、用于通知管道的event、通知接收的socket描述符,通知发送的socket描述符、socketl连接通知队列,这也很好对号入座吧



(4)conn

conn定义在Memcached.h/371行,具体如下:

typedef struct conn conn;

struct conn {

int sfd;

...

struct event event;

short ev_flags;

short which; /** which events were just triggered */

...

LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */

};

这个结构非常庞大,提取关键的几个字段来分析一下, 可以将这个结构体看着是socket连接句柄数据结构,绑定了socket描述符、触发的事件、处理连接的线程指针等,这个也很好对号入座吧



2、整体流程

整体流程如下图:



图1 整体流程
(1)在main函数中调用main_base = event_init()来初始化主线程Libevent实例(Memcached.c/5125)。

(2)在main函数中调用thread_init(Thread.c/772)来初始化工作线程(Memcached.c/5142),并将主线程Libevent实例作为参数传入。

(3)在thread_init函数中为指定数量的工作线程分配内存(Thread.c/810行),为每个线程创建管道(Thread.c/821行),并分别绑定到通知收(Thread.c/826行)和发(Thread.c/827行)的socket描述符上,调用函数setup_thread初始化线程信息(Thread.c/328行),调用函数create_worker为每个线程注册回调函数(Thread.c/301行)。关键代码秀一下:

for (i = 0; i < nthreads; i++) {

int fds[2];

if (pipe(fds)) {

...

}

threads[i].notify_receive_fd = fds[0];

threads[i].notify_send_fd = fds[1];

setup_thread(&threads[i]);

...

}

/* Create threads after we've done all the libevent setup. */

for (i = 0; i < nthreads; i++) {

create_worker(worker_libevent, &threads[i]);

}

(4)在setup_thread函数中,为工作线程初始化Libevent实例(Thread.c/329行),为主线程通知读(notify_receive_fd)注册回调函数thread_libevent_process(Thread.c/336行),初始化cq队列(Thread.c/350行),关键代码如下:

static void setup_thread(LIBEVENT_THREAD *me) {

me->base = event_init();

...

/* Listen for notifications from other threads */

event_set(&me->notify_event, me->notify_receive_fd,

EV_READ | EV_PERSIST, thread_libevent_process, me);

event_base_set(me->base, &me->notify_event);

if (event_add(&me->notify_event, 0) == -1) {

...

}

me->new_conn_queue = malloc(sizeof(struct conn_queue));

...

cq_init(me->new_conn_queue);

...

}

(5)在thread_libevent_process函数中,读取主线程发送的通知接收消息(Thread.c/398行),将主线程accept来的fd注册到工作线程的Libevent实例中(Thread.c/407行),主线程accept来的fd从conn_queue队列获取(Thread.c/404行),关键代码如下:

static void thread_libevent_process(int fd, short which, void *arg) {

LIBEVENT_THREAD *me = arg;

CQ_ITEM *item;

char buf[1];

if (read(fd, buf, 1) != 1)

...

switch (buf[0]) {

case 'c':

item = cq_pop(me->new_conn_queue);

if (NULL != item) {

conn *c = conn_new(item->sfd, item->init_state, item->event_flags,

item->read_buffer_size, item->transport, me->base);

...

}

}

(6)在函数conn_new中,创建conn句柄(Memcached.c/358行),为句柄注册回调函数event_handler处理事件(Memcached.c/452行),将该句柄作为参数传入回调函数并设置到Libevent中(Memcached.c/453行),该函数的关键代码如下:

conn *conn_new(const int sfd, enum conn_states init_state,

const int event_flags,

const int read_buffer_size, enum network_transport transport,

struct event_base *base) {

conn *c = conn_from_freelist();

if (NULL == c) {

if (!(c = (conn *)calloc(1, sizeof(conn)))) {

...

}

...

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);

event_base_set(base, &c->event);

c->ev_flags = event_flags;

if (event_add(&c->event, 0) == -1) {

...

}

...

}

(7)在create_worker函数中,创建工作线程并注册回调函数(Thread.c/308行),在工作线程的回调函数work_libevent中,开始Libevent主循环(Thread.c/384行)。

(8)在main函数中,调用函数server_sockets(Memcached.c/5199行),再调用函数server_socket(Memcached.c/4299行),进而调用函数new_socket(Memcached.c/4164行),在调用函数conn_new(Memcached.c/4252行),创建并注册listen
fd到主线程Libevent实例上,最后开始Libevent主循环即event_base_loop(Memcached.c/5228行)。在conn_new函数关键代码见步骤(6)

(9)在event_handler函数中,调用函数drive_machine(Memcached.c/4065行),在该函数中处理所有事件,其关键代码如下:

static void drive_machine(conn *c) {

...

while (!stop) {

switch(c->state) {

case conn_listening:

addrlen = sizeof(addr);

if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {

...

}

...

if (settings.maxconns_fast &&

stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {

...

} else {

dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,

DATA_BUFFER_SIZE, tcp_transport);

}

stop = true;

break;

...

}

}

return;

}

在处理事件时,如果是listening事件,则调用函数dispatch_conn_new(Memcached.c/3785行)将accept fd分配给工作线程。

(10)在dispatch_conn_new函数中,根据round-robin算法(Thread.c/450行)将新连接push到所分配线程(Thread.c/452行)的CQ队列中(Thread.c/462行),并通过管道发送通知消息“c”(Thread.c/466行),关键代码如下:

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,

int read_buffer_size, enum network_transport transport) {

CQ_ITEM *item = cqi_new();

char buf[1];

int tid = (last_thread + 1) % settings.num_threads;

LIBEVENT_THREAD *thread = threads + tid;

last_thread = tid;

...

cq_push(thread->new_conn_queue, item);

...

buf[0] = 'c';

if (write(thread->notify_send_fd, buf, 1) != 1) {

perror("Writing to thread notify pipe");

}

}

dispatch_conn_new函数只在主线程中调用,last_thread为静态变量,每次将该变量值+1,再模线程数来选择工作线程。

3、线程模型

Libevent本身是单线程的,Memcached采用消息通知+同步层机制使得其支持多线程,整体模型见如下神图





图2 线程模型

上图是流传已经的神图,先拿来主义

,源码见前面剖析部分,不再重复,图中也描述得很详细,归纳起来就如下:

每个线程包括主线程都各自有独立的Libevent实例,Memcached的listen fd注册到主线程的Libevent实例上,由主线程来accept新的连接,接受新的连接后根据Round-robin算法选择工作线程,将新连接的socket fd封装为CQ_ITEM后push到所选工作线程的CQ队列中,然后主线程(notify_send_fd)通过管道发送字符“c”到工作线程(notify_receive_fd),而notify_receive_fd已经注册到工作线程的Libevent实例上了,这样工作线程就能收到通知“c”,然后从该工作线程的CQ队列中pop出CQ_ITEM进而取出新连接并将fd注册到工作线程的Libevent实例上,从而由工作线程来处理该连接的所有后续事件。

需要注意的数据:Memcached默认开启线程数为4(Memcached.c/216行),也可以通过参数-t来指定开启线程数(Memcached.c/4865行),当线程数大于64时会给出错误提示(Memcached.c/4874行),建议线程数为小于或等于CPU核数。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: