您的位置:首页 > 理论基础 > 计算机网络

【slighttpd】基于lighttpd架构的Server项目实战(4)—简单的echo服务器

2016-02-04 23:23 567 查看
在这一部分,我们将对上一篇中的master-worker进行拓展,成为一个简单的echo服务器。

这一步我们需要添加两个类:Listener和Connection;

Listener的职责:

创建监听套接字;

注册监听套接字事件;

在监听事件的回调函数中进行accept并创建新连接;

其头文件如下:

[code]/*************************************************************************
    > File Name: listener.h
    > Author: Jiange
    > Mail: jiangezh@qq.com 
    > Created Time: 2016年01月27日 星期三 19时46分34秒
 ************************************************************************/

#ifndef _LISTENER_H
#define _LISTENER_H

#include <string>

#include "event2/event.h"
#include "event2/util.h"

#include "util.h"

class Worker;

class Listener
{
    public:
        Listener(const std::string &ip, unsigned short port);
        ~Listener();

        bool InitListener(Worker *worker);
        void AddListenEvent();

        static void ListenEventCallback(evutil_socket_t fd, short event, void *arg);

        Worker             *listen_worker;
        evutil_socket_t     listen_sockfd;
        struct sockaddr_in  listen_addr;
        struct event       *listen_event;
        uint64_t            cnt_connection;
};

#endif


接下来看看具体实现:

[code]/*************************************************************************
    > File Name: listener.cpp
    > Author: Jiange
    > Mail: jiangezh@qq.com 
    > Created Time: 2016年01月27日 星期三 19时48分56秒
 ************************************************************************/

#include "listener.h"
#include "worker.h"
#include "connection.h"

#include<iostream>

Listener::Listener(const std::string &ip, unsigned short port)
{
    //ipv4
    listen_addr.sin_family      = AF_INET;
    listen_addr.sin_addr.s_addr = inet_addr(ip.c_str());
    listen_addr.sin_port        = htons(port);
    listen_event                = NULL;
    cnt_connection  = 0;
    std::cout << "Init listener" << std::endl;
}

Listener::~Listener()
{
    if (listen_event)
    {
        event_free(listen_event);
        close(listen_sockfd);
    }
    std::cout << "Listener closed" << std::endl;
}

bool Listener::InitListener(Worker *worker)
{
    if (-1 == (listen_sockfd = socket(AF_INET, SOCK_STREAM, 0)))
    {
        return false;
    }

    //非阻塞
    evutil_make_socket_nonblocking(listen_sockfd);

    int reuse = 1;

    //重用
    setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));

    if (0 != bind(listen_sockfd, (struct sockaddr*)&listen_addr, sizeof(listen_addr)))
    {
        return false;
    }
    if (0 != listen(listen_sockfd, 5))
    {
        return false;
    }

    listen_worker = worker;
    return true;
}

/* 这里单独作为一个函数,而不是合并到上面函数中。
 * 因为InitListener是在fork之前调用的,此时
 * worker的w_base还未赋值;
 */
void Listener::AddListenEvent()
{
    //echo先从客户端读取数据,故此处监听读
    listen_event  = event_new(listen_worker->w_base, listen_sockfd, EV_READ | EV_PERSIST, Listener::ListenEventCallback, this);
    event_add(listen_event, NULL);
}

void Listener::ListenEventCallback(evutil_socket_t sockfd, short event, void *arg)
{
    Listener *listener  = (Listener*)arg;
    Connection *con     = new Connection();

    socklen_t addr_len  = sizeof(con->con_addr);
    /* 需要处理惊群,暂时忽略,可以考虑加锁
     * 但是加锁是否会引起变相惊群?
     * 考虑解决方案:同一时间只能有一个进程监听本套接字
     * 可通过event的增删来实现,需要考虑负载均衡。
     */
    if (-1 == (con->con_sockfd = accept(sockfd, (struct sockaddr*)&con->con_addr, &addr_len)))
    {

        delete con;
        return ;
        //这里不应该退出,不然尝试失败的程序就结束了
/*      
        if (errno != EAGAIN && errno != EINTR)
        {
            event_base_loopexit(listener->worker->w_base, NULL);
        }
*/
    }
    pid_t pid = getpid();
    std::cout << "listen accept: " << con->con_sockfd << " by process " << pid <<std::endl;

    if (!con->InitConnection(listener->listen_worker))
    {
        Connection::FreeConnection(con);
        return ;
    }

    con->con_worker->con_map[con->con_sockfd] = con;
    ++listener->cnt_connection;

}


接下来看看Connection:

一个Connection实例即代表一个连接,它维护从listener的回调函数那里accept得到的套接字,并在该套接字上监听读写事件,进行request和response。

[code]/*************************************************************************
    > File Name: connection.h
    > Author: Jiange
    > Mail: jiangezh@qq.com 
    > Created Time: 2016年01月27日 星期三 20时10分35秒
 ************************************************************************/

#ifndef _CONNECTION_H
#define _CONNECTION_H

#include <string>
#include <queue>

#include "event2/event.h"
#include "event2/util.h"

#include "util.h"

class Worker;

class Connection
{
    public:
        Connection();
        ~Connection();

        bool InitConnection(Worker *worker);

        static void ConEventCallback(evutil_socket_t fd, short event, void *arg);

        Worker             *con_worker;

        evutil_socket_t     con_sockfd;
        struct sockaddr_in  con_addr;
        struct event       *con_event;  
        //可改进的地方:这里用两个event,一个注册读,一个注册写会效率高点
        //struct event     *write_event;
        //struct event     *read_event;

        std::string         con_inbuf;
        std::string         con_intmp;
        std::string         con_outbuf;

        static void FreeConnection(Connection *con);

    private:
        void WantRead();
        void NotWantRead();
        void WantWrite();
        void NotWantWrite(); 

};

#endif


目前我只使用了一个event,所以在读写事件的切换时,需要进行del,assign,add三个操作,如果使用两个event分别负责读写,则可以省去assign这一步,以提高效率~

在回调函数的设计中,本来打算使用两个回调函数:一个处理读,一个处理写。

不过其实可以合并到同一个回调函数里,所以还是在一个函数中处理,并增加4个函数,来进行监听事件的切换,这样做更有利于后面状态机的拓展:

[code]void Connection::WantRead()
{
    short event = event_get_events(con_event);
    event_del(con_event);
    event_assign(con_event, con_worker->w_base, con_sockfd, event | EV_READ, Connection::ConEventCallback, this);
    event_add(con_event, NULL); 
}

void Connection::NotWantRead()
{
    short event = event_get_events(con_event);
    event_del(con_event);
    event_assign(con_event, con_worker->w_base, con_sockfd, event & (~EV_READ), Connection::ConEventCallback, this);
    event_add(con_event, NULL); 
}

void Connection::WantWrite()
{
    short event = event_get_events(con_event);
    event_del(con_event);
    event_assign(con_event, con_worker->w_base, con_sockfd, event | EV_WRITE, Connection::ConEventCallback, this);
    event_add(con_event, NULL); 
}

void Connection::NotWantWrite() 
{
    short event = event_get_events(con_event);
    event_del(con_event);
    event_assign(con_event, con_worker->w_base, con_sockfd, event & (~EV_WRITE), Connection::ConEventCallback, this);
    event_add(con_event, NULL);
}


除以上函数外其他部分的具体实现:

[code]/*************************************************************************
    > File Name: connection.cpp
    > Author: Jiange
    > Mail: jiangezh@qq.com 
    > Created Time: 2016年01月28日 星期四 12时06分22秒
 ************************************************************************/

#include "connection.h"
#include "worker.h"

#include<iostream>

Connection::Connection()
{
    con_worker = NULL;

    con_event    = NULL;
}

Connection::~Connection()
{
    if (con_event)
    {
        event_free(con_event);
        std::cout << con_sockfd << " closed" << std::endl;
        close(con_sockfd);
    }
}

/* 删除worker中相应的con,并释放该con */
void Connection::FreeConnection(Connection *con)
{
    Worker *worker = con->con_worker;

    if (con->con_event)
    {
        Worker::ConnectionMap::iterator con_iter = worker->con_map.find(con->con_sockfd);
        worker->con_map.erase(con_iter);
    }

    delete con;
}

bool Connection::InitConnection(Worker *worker)
{
    con_worker = worker;

    try
    {   
        //这里不能开太大,会爆内存!
        //后期可能需要在内存的使用上进行优化~
        con_intmp.reserve(10 * 1024);
        con_inbuf.reserve(10 * 1024);
        con_outbuf.reserve(10 * 1024);

        evutil_make_socket_nonblocking(con_sockfd);
        //test:监听读事件,从客户端读,然后回显
        con_event = event_new(con_worker->w_base, con_sockfd, EV_READ | EV_PERSIST, Connection::ConEventCallback, this);
    }
    catch(std::bad_alloc)
    {
        std::cout << "InitConnection():bad_alloc" <<std::endl;
    }
    event_add(con_event, NULL);

    return true;
}

/* 循环读写
 * 注意,在读的时候,此处ret为0时,可能是空字符串之类的
 * 所以在这里暂不做处理
 */
void Connection::ConEventCallback(evutil_socket_t sockfd, short event, void *arg)
{

    Connection *con = (Connection*)arg;

    if (event & EV_READ) 
    {
        int cap = con->con_intmp.capacity();
        int ret = read(sockfd, &con->con_intmp[0], cap);

        if (ret == -1)
        {
            if (errno != EAGAIN && errno != EINTR)
            {
                FreeConnection(con);
                return;
            }
        }
        else if (ret == 0)
        {
            FreeConnection(con); 
            return;
        }
        else
        {
            con->con_inbuf.clear();
            con->con_inbuf.append(con->con_intmp.c_str(), ret);
        }
        con->con_outbuf = con->con_inbuf;
        con->NotWantRead();
        con->WantWrite();
    }

    if (event & EV_WRITE)
    {
        int ret = write(sockfd, con->con_outbuf.c_str(), con->con_outbuf.size());

        if (ret == -1)
        {
            if (errno != EAGAIN && errno != EINTR)
            {
                FreeConnection(con);
                return;
            }
        }
        con->NotWantWrite();
        con->WantRead();
    }   
}


介绍完listener和connection之后,我们需要相应地调整master和worker的代码:

1.修改两者的构造函数:

[code]Master::Master(const std::string &ip, unsigned short port)
    :worker(ip, port)
{
    //……
}

Worker::Worker(const std::string &ip, unsigned short port)
        :listener(ip, port)
{
    //……
}


2.在master中开始创建监听套接字:

[code]bool Master::StartMaster()
{
    std::cout << "Start Master" << std::endl;

    if (!worker.listener.InitListener(&worker))
    {
        return false;
    }
    //……
}


3.在worker中增加listener和connection map成员:

[code]class Master;
class Connection;

class Worker
{
    public:
        typedef std::map<evutil_socket_t, Connection*> ConnectionMap;
        //……
        Listener            listener;
        ConnectionMap       con_map;
};


4.worker析构函数释放持有的连接:

[code]Worker::~Worker()
{
    //……

    if (w_base)
    {
        ConnectionMap::iterator con_iter = con_map.begin();
        while (con_iter != con_map.end())
        {
            Connection *con = con_iter->second;
            delete con;
            ++con_iter;
        }
        event_base_free(w_base);
    }
}


5.worker增加监听event事件:

[code]void Worker::Run()
{
    w_base = event_base_new();
    listener.AddListenEvent();
    //……
    return;
}


6.修改main函数中的master构造;

最后,附上一个使用到的头文件:

[code]/*************************************************************************
    > File Name: util.h
    > Author: Jiange
    > Mail: jiangezh@qq.com 
    > Created Time: 2016年01月28日 星期四 10时39分22秒
 ************************************************************************/

#ifndef _UTIL_H
#define _UTIL_H

#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <stdint.h>

#endif


至此,我们简单的echo服务器就大功告成啦~

下面给出makefile:

[code]THIRD_LIBS=-levent
LIBS=-ldl
CFLAGS=-I./include

master:src/master.o src/worker.o src/listener.o src/connection.o src/main.o
        g++ -g -o $@ src/master.o src/worker.o src/listener.o src/connection.o src/main.o $(THIRD_LIBS) $(LIBS)

src/master.o:src/master.cpp include/master.h
        g++ -g -o $@ -c $< $(CFLAGS)

src/worker.o:src/worker.cpp include/worker.h include/util.h
        g++ -g -o $@ -c $< $(CFLAGS)

src/listener.o:src/listener.cpp include/listener.h include/util.h
        g++ -g -o $@ -c $< $(CFLAGS)

src/connection.o:src/connection.cpp include/connection.h include/util.h
        g++ -g -o $@ -c $< $(CFLAGS)

src/main.o:src/main.cpp include/master.h
        g++ -g -o $@ -c $< $(CFLAGS)

clean:
        rm -f src/*.o master


我用python写了个用于测试的客户端:

[code]#python2.7.6
#coding=utf-8

import socket

if __name__ == "__main__":
    sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sockfd.connect(('localhost', 8000))
    message = ""
    while 1:
        message = raw_input("Please input:")
        sockfd.send(message)
        message = sockfd.recv(8000)
        print message
    sockfd.close()


另外一个用于测试的:

[code]#python2.7.6
#coding=utf-8

import socket
import time
import threading

http_request = "POST /test_server HTTP/1.1\r\nHost:test.py\r\nContent-Length:5\r\n\r\nHello"

def make_a_request():
    sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sockfd.connect(('localhost', 8000))
    sockfd.sendall(http_request)
    sockfd.recv(8000)
    sockfd.close()

if __name__ == "__main__":
    thread_list = []

    start_time = time.time()

    for i in range(0, 1000):
        thread = threading.Thread(target = make_a_request)
        thread_list.append(thread)
        thread.start()

    for thread in thread_list:
        thread.join()

    print "Time used for 1000 request: ", time.time() - start_time


另外,由于是多进程,所以需要测试一下并发下的运转情况~

(可以使用webbench~)

本程序在github的源代码

接下来,我们将引入状态机机制,并开始进行一些http的请求与处理!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: