您的位置:首页 > 理论基础 > 计算机网络

memcached分析之网络模型

2015-12-29 10:19 387 查看
memcache源码分析之网络模型

accept/dispatch模型

"主线程统一accept/dispatch子线程"的基础设施:

主线程创建多个子线程(这些子线程也称为worker线程),每一个线程都维持自己的事件循环,即每个线程都有自己的epoll,并且都会调用epoll_wait函数进入事件监听状态。

每一个worker线程(子线程)和主线程之间都用一条管道相互通信。每一个子线程都监听自己对应那条管道的读端。

当主线程想和某一个worker线程进行通信,直接往对应的那条管道写入数据即可。

"主线程统一accept/dispatch子线程"模型的工作流程:

主线程负责监听进程对外的TCP监听端口。当客户端申请连接connect到进程的时候,主线程负责接收accept客户端的连接请求。

然后主线程选择其中一个worker线程,把客户端fd通过对应的管道传给worker线程。

worker线程得到客户端的fd后负责和这个客户端进行一切的通信。

memcached模型的区别

1.memcached使用libevent作为进行事件监听;

2.memcached往管道里面写的内容不是fd,而是一个简单的字符。

每一个worker线程都维护一个CQ队列,主线程把fd和一些信息写入一个CQ_ITEM里面,然后主线程往worker线程的CQ队列里面push这个CQ_ITEM。

接着主线程使用管道通知worker线程:“我已经发了一个新客户给你,你去处理吧”

主线程

1.基础设置

2.创建main_base

3.conn_init()

4.thread_init()

5.server_sockets

{

server_socket{conn_new()

}

6.thread_init创建线程,创建一个LIBEVENT_THREAD结构体,为每个线程创建一个CQ队列,为每个线程创建event事件,监听管道的读端,

每当管道可读时,触发线程的回调函数thread_libevent_process

7.线程通过server_sockets创建tcp连接

8.调用erver_socket创建socket,绑定socket,并创建conn队列,通过conn_new创建连接的结构体,设置主线程的event事件

监听创建的每一个套接字的fd,当fd可读时,调用event_handler函数

主线程回调函数

event_handler函数

在fd可读的情况下,调用这个函数,函数调用drive_machine

drive_machine函数内部是一个有限状态机,根据state来判断进入哪一操作,主线程的回调函数和worker线程的回调函数最终

都是通过这个函数,由不同的state进入不同的状态进行工作

主线程状态是conn_listening状态,表示等待连接,在有连接到来时,接受连接,选择线程来处理对应的线程来处理,并修改链接的

状态为conn_new_cmd,向子线程监听的管道输入一个字符,使得管道可写,触发子函数的回调函数

子线程回调函数

在管道可读时,调用回调函数,从CQ队列中取出一个item,然后根据item初始化对应的连接,也是调用conn_new函数

这个函数创建连接,设置连接状态为conn_new_cmd,与建立好的连接进行通信

CQ队列

typedef struct conn_queue_item CQ_ITEM;

struct conn_queue_item {

int sfd;

enum conn_states init_state;

int event_flags;

int read_buffer_size;

enum network_transport transport;

CQ_ITEM *next;

};

/* A connection queue. */

typedef struct conn_queue CQ;

struct conn_queue {

CQ_ITEM *head;//指向队列的第一个节点

CQ_ITEM *tail;//指向队列的最后一个节点

pthread_mutex_t lock; //一个队列就对应一个锁

};

static void cq_init(CQ *cq);

static CQ_ITEM *cq_pop(CQ *cq);

static void cq_push(CQ *cq, CQ_ITEM *item);

LIBEVENT_THREAD结构体

memcached定义了LIBEVENT_THREAD类型的一个全局变量指针threads。当确定了memcached有多少个worker线程后,就会动态申请一个LIBEVENT_THREAD数组,并让threads指向其。

于是每一个worker线程都对应有一个LIBEVENT_THREAD结构体。主线程通过全局变量threads就可以很方便地访问每一个worker线程的CQ队列和通信管道。

typedef struct {

pthread_t thread_id; //线程id

struct event_base *base; //线程所使用的event_base

struct event notify_event;//用于监听管道读事件的event

int notify_receive_fd; //管道的读端fd

int notify_send_fd; //管道的写端fd

struct conn_queue *new_conn_queue; /* queue of new connections to handle */

...

} LIBEVENT_THREAD;

static LIBEVENT_THREAD *threads;

//在面函数中调用thread_init,全局变量main_base

void thread_init(int nthreads, struct event_base *main_base)

{

//申请具有nthreads个元素的LIBEVENT_THREAD数组

threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));

for (i = 0; i < nthreads; i++)

{

//包括分配管道

//每一个线程配一个event_base,并设置event监听notify_receive_fd的读事件

//同时还为这个线程分配一个conn_queue队列

setup_thread(&threads[i]);

}

for (i = 0; i < nthreads; i++) {

//创建线程,线程函数为worker_libevent, 线程参数为&threads[i]

create_worker(worker_libevent, &threads[i]);

}

}

//work线程从管道读到master线程发过来的消息触发的回调函数为thread_libevent_process

static void setup_thread(LIBEVENT_THREAD *me)

{

me->base = event_init();

event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);

event_base_set(me->base, &me->notify_event);//将event_base和event相关联

event_add(&me->notify_event, 0);

//创建一个CQ队列

me->new_conn_queue = malloc(sizeof(struct conn_queue));

cq_init(me->new_conn_queue);

}

static void create_worker(void *(*func)(void *), void *arg)

{

pthread_attr_init(&attr);

pthread_create(&thread, &attr, func, arg));

}

CQ_ITEM内存池

memcached在申请一个CQ_ITEM结构体时,并不是直接使用malloc申请的。因为这样做的话可能会导致大量的内存碎片(作为长期运行的服务器进程memcached需要考虑这个问题)。

为此,memcached也为CQ_ITEM使用类似内存池的技术:预分配一块比较大的内存,将这块大内存切分成多个CQ_ITEM。

一次性分配64个CQ_ITEM大小的内存(即预分配),下次调用本函数的时候,直接从之前分配64个中要一个即可。 \

由于是为了防止内存碎片,所以不是以链表的形式放置这64个CQ_ITEM。而是数组的形式。

于是,cqi_free函数就有点特别了。它并不会真正释放.而是像内存池那样归还

static CQ_ITEM *cqi_new(void)

{

//如果cqi_frelist还有可以分配的item

//从头部取出item

if (cqi_freelist) {

item = cqi_freelist;

cqi_freelist = item->next;

}

//没有多余的CQ_ITEM了

if (NULL == item)

{

item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);

//item[0]直接返回为调用者,不用next指针连在一起。调用者负责将

//item[0].next赋值为NULL

for (i = 2; i < ITEMS_PER_ALLOC; i++)

item[i - 1].next = &item[i];

item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;

cqi_freelist = &item[1];

}

return item;

}

static void cqi_free(CQ_ITEM *item)

{

item->next = cqi_freelist;

cqi_freelist = item; //头插法归还

}

主线程工作

int main (int argc, char **argv)

{

......

//对memcached的关键设置取默认值

settings_init();

......

main_base = event_init();

conn_init();

//创建settings.num_threads个worker线程,并且为每个worker线程创建一个CQ队列

//并为这些worker申请各自的event_base,worker线程然后进入事件循环中

thread_init(settings.num_threads, main_base);

if (settings.port && server_sockets(settings.port, tcp_transport,portnumber_file)) {}//tcp_transport是枚举类型

if (event_base_loop(main_base, 0) != 0){}{//主线程进入事件循环

}

中间的server_sockets函数是创建一个监听客户端的socket,并将创建一个event监听该socket的可读事件。

static int server_sockets(int port, enum network_transport transport,FILE *portnumber_file)

{

char *list = strdup(settings.inter);

for (char *p = strtok_r(list, ";,", &b);p != NULL; p = strtok_r(NULL, ";,", &b))

{

......

处理参数,根据给定参数,可能给多个ip或端口,对每个连接进行accept,并管理

server_socket(p, the_port, transport, portnumber_file);

}

}

//interface是一个ip、hostname或者NULL。这个ip字符串后面没有端口号。端口号由参数port指出

//conn_new是函数是关键,memcached为每个连接都进行管理

//conn_new是函数,函数里面会创建一个用于监听socket fd的event,并调用event_add加入到主线程的event_base中。

static int server_socket(const char *interface,int port,enum network_transport transport,FILE *portnumber_file)

{

getaddrinfo(interface, port_buf, &hints, &ai);

for (next= ai; next; next= next->ai_next)

{

new_socket(next);

bind(sfd, next->ai_addr, next->ai_addrlen);

listen(sfd, settings.backlog);

listen_conn_add = conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,transport, main_base);

}

//将要监听的多个conn放到一个监听队列里面

listen_conn_add->next = listen_conn;

listen_conn = listen_conn_add;

}

static int new_socket(struct addrinfo *ai)

{

sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);

flags = fcntl(sfd, F_GETFL, 0);

fcntl(sfd, F_SETFL, flags | O_NONBLOCK);

}

连接管理者conn

memcached为每一个socket fd(也就是一个连接)都创建一个conn结构体,用于管理这个socket fd(连接)。因为一个连接会有很多数据和状态信息,所以需要一个结构体来负责管理。

conn_init函数,在启动memcached之后就可以确定最多允许多少个客户端同时在线。有了这个数值就不用一有新连接就malloc一个conn结构体(这样会很容易造成内存碎片)。

有了这个数值那么可以在一开始(conn_init函数),就申请动态申请一个数组。有新连接就从这个数组中分配一个元素即可。

conn **conns;

static void conn_init(void)

{

max_fds = settings.maxconns + headroom + next_fd;

注意是conn指针不是conn结构体

conns = calloc(max_fds, sizeof(conn *);

}

conn结构体是比较大的一个结构体(成员变量很多)。不一定会存在settings.maxconns个同时在线的客户端。所以可以等到需要conn结构体的时候再去动态申请。需要时去动态申请,

这样不会产生内存碎片,因为可以循环使用的。如果没有这个conn*指针数组,那么当这个连接断开后就要free这个conn结构体所占的内存(不然就内存泄漏了)。有了这个数组那么就可以不free,由数组管理这个内存。

conn *conn_new(const int sfd, enum conn_states init_state,//init_state值为conn_listening

const int event_flags,

const int read_buffer_size, enum network_transport transport,

struct event_base *base)

{

c = conns[sfd];//直接使用下标

//之前没有哪个连接用过这个sfd值,需要申请一个conn结构体

if (NULL == c)

{

c = (conn *)calloc(1, sizeof(conn));

...//初始化一些成员变量

c->sfd = sfd;

conns[sfd] = c; //将这个结构体交由conns数组管理

}

...//初始化另外一些成员变量

c->state = init_state;//值为conn_listening

//等同于event_assign,会自动关联current_base。event的回调函数是event_handler

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);

event_base_set(base, &c->event);

c->ev_flags = event_flags;

event_add(&c->event, 0);

}

回调函数分析

worker线程对于管道可读事件的回调函数是ethread_libevent_process函数。主线程对于socket fd可读事件的回调函数是event_handler函数。

event_handler函数

void event_handler(const int fd, const short which, void *arg)

{

c = (conn *)arg;

drive_machine(c);

}

drive_machine函数内部是一个有限状态机,根据state来判断进入哪一操作,主线程的回调函数和worker线程的回调函数最终

都是通过这个函数,由不同的state进入不同的状态进行工作。

static void drive_machine(conn *c)

{

bool stop = false;

while (!stop)

{

switch(c->state)

{

case conn_listening:

addrlen = sizeof(addr);

sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);

...

//选定一个worker线程,new一个CQ_ITEM,把这个CQ_ITEM仍给这个线程.

dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport);

stop = true;

break;

....

}

}

}

static int last_thread = -1;

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,int read_buffer_size, enum network_transport transport)

{

CQ_ITEM *item = cqi_new();//申请一个CQ_ITEM

int tid = (last_thread + 1) % settings.num_threads;//轮询的方式选定一个worker线程

LIBEVENT_THREAD *thread = threads + tid;

last_thread = tid;

item->sfd = sfd;

item->init_state = init_state;//conn_new_cmd

item->event_flags = event_flags;//EV_READ | EV_PERSIST

item->read_buffer_size = read_buffer_size;//DATA_BUFFER_SIZE(2048)

item->transport = transport;

cq_push(thread->new_conn_queue, item);//把这个item放到选定的worker线程的CQ队列中

buf[0] = 'c';

}

worker线程的回调函数thread_libevent_process

static void thread_libevent_process(int fd, short which, void *arg)

{

LIBEVENT_THREAD *me = arg;

CQ_ITEM *item;

read(fd, buf, 1);

switch (buf[0]) {

case 'c':

//从CQ队列中读取一个item,因为是pop所以读取后,CQ队列会把这个item从队列中删除

item = cq_pop(me->new_conn_queue);

//从这里cq_pop出来的item的state为item->init_state = init_state;//conn_new_cmd,为上次插入的item

if (NULL != item) {

//为sfd分配一个conn结构体,并且为这个sfd建立一个event,然后让base监听这个event

//这个sfd的事件回调函数是event_handler

conn *c = conn_new(item->sfd, item->init_state, item->event_flags,

item->read_buffer_size, item->transport, me->base);

c->thread = me;

cqi_free(item);

}

break;

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: