您的位置:首页 > 数据库 > Memcache

Memcached线程模型分析

2015-08-30 22:30 477 查看
前面提到过Memcached的线程模型:Memcached使用Libevent,以此为基础来处理事件。其原理为:启动时的线程为main thread,它包含一个event_base,之后创建多个worker thread;每个work thread中也有一个event_base。main thread中的event_base负责监听网络,接收新连接;当建立连接后就把新连接交给worker thread来处理。

Memcached的main函数在Memcached.c中。main函数中初始化了系统设置,初始化多线程:

1、初始化了main thread中的event_base、初始化多线程、初始化网络

/* initialize main thread libevent instance */
    main_base = event_init();//初始化main event_base
……
/* start up worker threads if MT mode */
    memcached_thread_init(settings.num_threads, main_base);//初始化工作线程
……
//初始化socket并监听。这里假设使用的是TCP
        if (settings.port && server_sockets(settings.port, tcp_transport,
                                           portnumber_file)) {
            vperror("failed to listen on TCP port %d", settings.port);
            exit(EX_OSERR);
        }


2、接着main thread初始化worker thread。每个worker thread都对应一个结构体
LIBEVENT_THREAD
,包含其相关信息

//定义在Memcached.h中
typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD;


了解了这个结构体,再来看一下main thread和worker thread的初始化过程。

memcached_thread_init(settings.num_threads, main_base);//初始化工作线程
//函数memcached_thread_init定义在Thread.c中
void memcached_thread_init(int nthreads, struct event_base *main_base) {

    ……
    ……
     //LIBEVENT_THREAD包含了worker thread的相关信息,给worker thread的标识信息分配内存。
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }

    dispatcher_thread.base = main_base;//main thread的event_base
    dispatcher_thread.thread_id = pthread_self();//main thread的ID

    //为了能让main thread和workerthread通信,创建管道。worker thread监听管道的可读,当main thread需要
    //唤醒worker thread时,向管道写信息即可。
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }

        //把创建的管道和每个worker thread关联
        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]);//初始化每个worker thread
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    /* Create threads after we've done all the libevent setup. */
    //创建并启动worker thread。worker_libevent是worker线程的启动函数
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

    /* Wait for all the threads to set themselves up before returning. */
    //等待所有worker thread都启动
    pthread_mutex_lock(&init_lock);
    //每个线程启动后都会notify信号量,这里等待nthreads次notify。
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}


在上面的代码中,
setup_thread
设置worker thread的event相关信息;
create_worker
是创建并启动线程。

//setup_thread函数
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();//worker thread的event_base

    /* Listen for notifications from other threads */
    //监听创建的管道事件,事件处理函数为thread_libevent_process
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);//将事件和其event_base关联

    if (event_add(&me->notify_event, 0) == -1) {//将事件添加到event_base
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }

    me->new_conn_queue = malloc(sizeof(struct conn_queue));//为worker thread创建监听事件队列

    cq_init(me->new_conn_queue);//初始化这个队列

    ……
}

//create_worker函数
static void create_worker(void *(*func)(void *), void *arg) {
    ……
    //创建线程,func为线程启动函数。这里func为worker_libevent,arg为threads[i]
    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stderr, "Can't create thread: %s\n",
                strerror(ret));
        exit(1);
    }
}


再来看线程的启动函数func(worker_libevent)

static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;

    /* Any per-thread setup can happen here; memcached_thread_init() will block until
     * all threads have finished initializing.
     */

    register_thread_initialized();//这里向main thread发起信号量的notify。main thread在wait_for_thread_registration等待每个线程都启动

    event_base_loop(me->base, 0);
    return NULL;
}


到了这里多线程模型已经基本创建完成了。

3、main thread创建listen socket并监听。

//初始化socket并监听
main()
{
    ……
        if (settings.port && server_sockets(settings.port, tcp_transport,
                                           portnumber_file)) {
            vperror("failed to listen on TCP port %d", settings.port);
            exit(EX_OSERR);
        }
    ……
}

//函数server_sockets。其实内部又调用了一个函数server_socket,看这个函数
static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {
    int sfd;

    hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;//判断是TCP还是UDP

    for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1) {//创建socket fd
            continue;
        }

        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));//设置sfd属性

        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {//绑定地址
            close(sfd);
            continue;
        } else {
            success++;
            if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
                perror("listen()");
                close(sfd);
                freeaddrinfo(ai);
                return 1;
            }
        }

        if (IS_UDP(transport)) {
            ……
        } else {
            if (!(listen_conn_add = conn_new(sfd, conn_listening,//在conn_new中监听连接
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }
    }

    freeaddrinfo(ai);
    /* Return zero iff we detected no errors in starting up connections */
    return success == 0;
}


在conn_new设置监听连接。这里用到了一个连接的结构体
conn
,字段比较多,那几个字段看一下

//在Memcached.h中
typedef struct conn conn;
struct conn {
    int    sfd;//连接的fd
    sasl_conn_t *sasl_conn;
    bool authenticated;
    enum conn_states  state;//连接状态
    enum bin_substates substate;
    rel_time_t last_cmd_time;
    struct event event;//连接时间
    short  ev_flags;
   ……
    conn   *next;     /* Used for generating a list of conn structures */
    LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};


接下来看
conn_new
函数

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    conn *c;

    assert(sfd >= 0 && sfd < max_fds);
    c = conns[sfd];

    ……
    ……

    if (transport == tcp_transport && init_state == conn_new_cmd) {//如果是新连接,则初始化
        if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
                        &c->request_addr_size)) {
            perror("getpeername");
            memset(&c->request_addr, 0, sizeof(c->request_addr));
        }
    }

    ……

    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);//创建监听的event,事件处理函数为event_handler
    event_base_set(base, &c->event);//将event和event_base相关联
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {//添加event事件
        perror("event_add");
        return NULL;
    }

    ……
    return c;
}


4、再来看一下多线程模型中的事件处理,即main thread和worker thread如何协同工作处理事件。main thread的事件处理函数为
event_handler
,worker thread的事件处理函数为
thread_libevent_process


先来看main thread的事件处理函数
event_handler


void event_handler(const int fd, const short which, void *arg) {
    conn *c;

    c = (conn *)arg;//arg是新建的连接
    assert(c != NULL);

    c->which = which;
    ……
    drive_machine(c);//这里处理连接

    /* wait for next event */
    return;
}

//函数drive_machine
static void drive_machine(conn *c) {

    while (!stop) {

        switch(c->state) {//根据连接状态来处理
        case conn_listening:
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);//接收连接

            if () {
               ……
            } else {
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;
            ……        
        }
    }
    return;
}

//函数dispatch_conn_new。在Thread.c中
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new();//创建结构体CQ_ITEM,用来保存连接信息
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let's try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }

    int tid = (last_thread + 1) % settings.num_threads;//使用Round Robin分配worker thread

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    //给结构体CQ_ITEM赋值
    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    cq_push(thread->new_conn_queue, item);//将item添加到worker thread

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    if (write(thread->notify_send_fd, buf, 1) != 1) {//通知worker thread
        perror("Writing to thread notify pipe");
    }
}


至此,main thread已经将连接交给了worker thread。接下来看worker thread怎么处理

static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0]) {
    case 'c'://main thread给worker thread插入item后,发送了'c'
    item = cq_pop(me->new_conn_queue);//取出这个Item

    if (NULL != item) {
        //设置这个连接的event事件,并添加到event_base
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can't listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can't listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    /* we were told to pause and report in */
    case 'p':
    register_thread_initialized();
        break;
    }
}


至此,Memcached中,线程如何创建、关联event_base、设置event_handle、main thread和worker thread如何通信等问题已经基本理清,细节暂不考虑。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: