您的位置:首页 > 移动开发 > IOS开发

Tornado 的 IOStream 简介与应用

2014-12-16 09:14 417 查看
Tornado的核心源码是由ioloop.py和iostream.py这2个文件组成的。前者提供了一个循环,用于处理I/O事件;后者则封装了一个非阻塞的socket。

有了这2者后,就能搭建起TCP server和HTTP server,实现异步HTTP客户端,这便是Tornado的主要内容了。

之前在研究socket时已差不多弄懂了ioloop的逻辑,于是本文就接着研究iostream了。

这里我并不想逐行分析iostream的源码,因为并不是什么很难懂的代码,于是只说说它做了些什么事。

先看IOStream的__init__()方法,重要的参数只有socket和io_loop这2个。所以很容易猜想到,它只是封装了这个socket,然后把I/O事件注册到io_loop上。

读了源码就能很快证实我们的猜测,值得注意的是,封装socket时还将其设为非阻塞的了。

此后在连接和读写时都调用了_add_io_state()方法,这个方法就是调用io_loop的add_handler()和update_handler()方法来注册事件的。

此外,似乎没有开放读完所有数据的接口,要自行实现的话可以使用_read_to_buffer()方法。

看完源码可能还是一头雾水,它究竟能用来干啥呢?

最为重要的用处当然是实现TCPServer了。有了ioloop,一个服务器就可以处理I/O事件了;可是I/O并不能凭空产生啊,它还需要通过socket连接和传输。

而正如前面所说,IOStream封装了socket,把它变成了非阻塞的,每次连接和读写这些I/O事件都注册到io_loop上,这样就实现了一个完整的TCPServer了。

可惜Tornado源码里的TCPServer并没有实际功能,HTTPServer又太复杂了,所以我找了一段echo server的代码,一看就知道用了。测试也很简单,用telnet连上去后,输入什么就会回显什么。

另一个功能还是和非阻塞有关。

Tornado能够以单线程处理高并发,靠的就是非阻塞。而假如其中的任何一次I/O是非阻塞的,需要消耗数毫秒甚至数秒,那么每秒能处理的请求数就不可能超过1000了。

所以这些耗时很长I/O访问必须封装成非阻塞的,而这早就被IOStream做好了。

此外,为了提高服务的响应速度,也需要用到异步处理。

以前几天我写的聊天室为例,当用户发送了一条信息后,我需要把它广播给所有的用户,然后才能结束这次响应。

在这个例子中,用户可能不需要等待多久。可是假如我还需要做一些复杂的处理,比如关键字过滤,分析@用户名、URL,保存到数据库,或者发送email等,我并不想让用户的请求一直被阻塞着。

那么我可以搭建另一个TCPServer,将接收到的信息通过IOStream发送给它,然后立即结束响应。在那个TCPServer上,我想做任何耗时的事,都不会拖慢主HTTPServer的响应速度;而一旦完成处理,就可以将结果通过IOStream返回给HTTPServer,让其完成扫尾工作。

这里仍然以聊天室为例,当接收到信息后,我将其发送给EchoServer,并结束响应。

EchoServer返回输入的信息,此时HTTPServer监听到这个事件,就读取并广播信息。

虽然对用户来说,效果是一样的。不过整个系统的扩展性就增强了,你可以把这个EchoServer替换成消息队列、memcache等各种数据源。虽然实现上仍有很大差异,但最重要的思路是一致的。

代码和上次的聊天室差不多,handler部分都一样,我就只贴出更改的部分吧:

import logging

import os.path

import socket

import uuid

import tornado.httpserver

import tornado.ioloop

import tornado.iostream

import tornado.options

import tornado.web

import tornado.websocket


def broadcast_message(message):

for handler in ChatSocketHandler.socket_handlers:

  try:

   handler.write_message(message)

  except:

   logging.error('Error sending message', exc_info=True)


for callback in ChatHandler.callbacks:

  try:

   callback(message)

  except:

   logging.error('Error in callback', exc_info=True)

ChatHandler.callbacks = set()


def send_message(message):

stream.write((message + '\n').encode('utf-8'))


def read_message_from_echo_server():

def broadcast(message):

  broadcast_message(message[:-1])

  read_message_from_echo_server()

stream.read_until('\n', broadcast)


# ...


if __name__ == '__main__':

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)

stream = tornado.iostream.IOStream(s)

stream.connect(('127.0.0.1', 8888))

read_message_from_echo_server()

main()


 

要注意的是,连接EchoServer需要在调用ioloop的start()方法之前,因为这个方法是个死循环,后面的代码没机会执行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: