您的位置:首页 > 理论基础 > 计算机网络

Tornado HTTP服务器的基本流程Tornado 底层I/O的内部实现

2018-02-12 16:28 561 查看
本小节介绍Tornado HTTP服务器的基本流程,分别分析httpserver, ioloop, iostream模块的代码来剖析Tornado底层I/O的内部实现。httpserver.py中给出了一个简单的http服务器的demo,代码如下所示:
01
from
 
tornado 
import
 
httpserver
02
from
 
tornado 
import
 
ioloop
03
 
 
04
def
 
handle_request(request):
05
   
message 
=
 
"You requested %s\n"
 
%
 
request.uri
06
   
request.write(
"HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\n%s"
 
%
 
(
07
                 
len
(message), message))
08
   
request.finish()
09
 
 
10
http_server 
=
 
httpserver.HTTPServer(handle_request)
11
http_server.bind(
8888
)
12
http_server.start()
13
ioloop.IOLoop.instance().start()
该http服务器主要使用到IOLoop, IOStream, HTTPServer, HTTPConnection几大模块,分别在代码ioloop.py, iostream.py, httpserver.py中实现。工作的流程如下图所示:

服务器的工作流程:首先按照socket->bind->listen顺序创建listen socket监听客户端,并将每个listen socket的fd注册到IOLoop的单例实例中;当listen socket可读时回调_handle_events处理客户端请求;在与客户端通信的过程中使用IOStream封装了读、写缓冲区,实现与客户端的异步读写。

HTTPServer分析

HTTPServer在httpserver.py中实现,继承自TCPServer(netutil.py中实现),是一个无阻塞、单线程HTTP服务器。支持HTTP/1.1协议keep-alive连接,但不支持chunked encoding。服务器支持'X-Real-IP'和'X-Scheme'头以及SSL传输,支持多进程为prefork模式实现。在源代码的注释中对以上描述比较详细的说明,这里就不再细说。HTTPServer和TCPServer的类结构:
1
class
 
HTTPServer(TCPServer):
2
    
def
 
__init__(
self
, request_callback, no_keep_alive
=
False
, io_loop
=
None
, xheaders
=
False
, ssl_options
=
None
*
*
kwargs):
3
    
def
 
handle_stream(
self
, stream, address):
1
class
 
TCPServer(
object
):
2
    
def
 
__init__(
self
, io_loop
=
None
, ssl_options
=
None
):
3
    
def
 
listen(
self
, port, address
=
""):
4
    
def
 
add_sockets(
self
, sockets):
5
    
def
 
bind(
self
, port, address
=
None
, family
=
socket.AF_UNSPEC, backlog
=
128
):
6
    
def
 
start(
self
, num_processes
=
1
):
7
    
def
 
stop(
self
):
8
    
def
 
handle_stream(
self
, stream, address):
9
    
def
 
_handle_connection(
self
, connection, address):
文章开始部分创建HTTPServer的过程:首先需要定义处理request的回调函数,在tornado中通常使用tornado.web.Application封装。然后构造HTTPServer实例,注册回调函数。接下来监听端口,启动服务器。最后启动IOLoop。
01
def
 
listen(
self
, port, address
=
""):
02
    
sockets 
=
 
bind_sockets(port, address
=
address)
03
    
self
.add_sockets(sockets)
04
 
05
def
 
bind_sockets(port, address
=
None
, family
=
socket.AF_UNSPEC, backlog
=
128
):
06
    
# 省略sockets创建,address,flags处理部分代码
07
    
for
 
res 
in
 
set
(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,
08
                                  
0
, flags)):
09
        
af, socktype, proto, canonname, sockaddr 
=
 
res
10
        
# 创建socket
11
        
sock 
=
 
socket.socket(af, socktype, proto)
12
        
# 设置socket属性,代码省略
13
        
 
14
        
sock.bind(sockaddr)
15
        
sock.listen(backlog)
16
        
sockets.append(sock)
17
    
return
 
sockets
18
 
19
def
 
add_sockets(
self
, sockets):
20
    
if
 
self
.io_loop 
is
 
None
:
21
        
self
.io_loop 
=
 
IOLoop.instance()
22
 
23
    
for
 
sock 
in
 
sockets:
24
        
self
._sockets[sock.fileno()] 
=
 
sock
25
        
add_accept_handler(sock, 
self
._handle_connection,
26
                           
io_loop
=
self
.io_loop)
27
                           
 
28
def
 
add_accept_handler(sock, callback, io_loop
=
None
):
29
    
if
 
io_loop 
is
 
None
:
30
        
io_loop 
=
 
IOLoop.instance()
31
 
32
    
def
 
accept_handler(fd, events):
33
        
while
 
True
:
34
            
try
:
35
                
connection, address 
=
 
sock.accept()
36
            
except
 
socket.error, e:
37
                
if
 
e.args[
0
in
 
(errno.EWOULDBLOCK, errno.EAGAIN):
38
                    
return
39
                
raise
40
            
# 当有连接被accepted时callback会被调用
41
            
callback(connection, address)
42
    
io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
43
    
 
44
def
 
_handle_connection(
self
, connection, address):
45
    
# SSL部分省略
46
    
try
:
47
        
stream 
=
 
IOStream(connection, io_loop
=
self
.io_loop)
48
        
self
.handle_stream(stream, address)
49
    
except
 
Exception:
50
        
logging.error(
"Error in connection callback"
, exc_info
=
True
)
这里分析HTTPServer通过listen函数启动监听,这种方法是单进程模式。另外可以通过先后调用bind和start(num_processes=1)函数启动监听同时创建多进程服务器实例,后文有关于此的详细描述。bind_sockets在启动监听端口过程中调用,getaddrinfo返回服务器的所有网卡信息, 每块网卡上都要创建监听客户端的请求并返回创建的sockets。创建socket过程中绑定地址和端口,同时设置了fcntl.FD_CLOEXEC(创建子进程时关闭打开的socket)和socket.SO_REUSEADDR(保证某一socket关闭后立即释放端口,实现端口复用)标志位。sock.listen(backlog=128)默认设定等待被处理的连接最大个数为128。返回的每一个socket都加入到IOLoop中同时添加回调函数_handle_connection,IOLoop添加对相应socket的IOLoop.READ事件监听。_handle_connection在接受客户端的连接处理结束之后会被调用,调用时传入连接和ioloop对象初始化IOStream对象,用于对客户端的异步读写;然后调用handle_stream,传入创建的IOStream对象初始化一个HTTPConnection对象,HTTPConnection封装了IOStream的一些操作,用于处理HTTPRequest并返回。至此HTTP Server的创建、启动、注册回调函数的过程分析结束。

HTTPConnection分析

该类用于处理http请求。在HTTPConnection初始化时对self.request_callback赋值为一个可调用的对象(该对象用于对http请求的具体处理和应答)。该类首先读取http请求中header的结束符b("\r\n\r\n"),然后回调self._on_headers函数。request_callback的相关实现在以后的系列中有详细介绍。
01
def
 
__init__(
self
, stream, address, request_callback, no_keep_alive
=
False
,
02
                 
xheaders
=
False
):
03
    
self
.request_callback 
=
 
request_callback
04
    
# some configuration code
05
    
self
._header_callback 
=
 
stack_context.wrap(
self
._on_headers)
06
    
self
.stream.read_until(b(
"\r\n\r\n"
), 
self
._header_callback)
07
 
08
def
 
_on_headers(
self
, data):
09
    
# some codes
10
    
self
.request_callback(
self
._request)

多进程HTTPServer

Tornado的HTTPServer是单进程单线程模式,同时提供了创建多进程服务器的接口,具体实现是在主进程启动HTTPServer时通过process.fork_processes(num_processes)产生新的服务器子进程,所有进程之间共享端口。fork_process的方法在process.py中实现,十分简洁。对fork_process详细的分析,可以参考 番外篇:Tornado的多进程管理分析。FriendFeed使用nginx提供负载均衡、反向代理服务并作为静态文件服务器,在后端服务器上可以部署多个Tornado实例。一般可以通过Supervisor控制Tornado app,然后再通过nginx对Tornado的输出进行反向代理。 具体可以参考下这篇文章: Supervisord进程管理工具的安装使用
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: