在 tornado 中异步无阻塞的执行耗时任务
2016-03-11 19:29
309 查看
在 tornado 中异步无阻塞的执行耗时任务
在 linux 上 tornado 是基于 epoll 的事件驱动框架,在网络事件上是无阻塞的。但是因为 tornado 自身是单线程的,所以如果我们在某一个时刻执行了一个耗时的任务,那么就会阻塞在这里,无法响应其他的任务请求,这个和 tornado 的高性能服务器称号不符,所以我们要想办法把耗时的任务转换为不阻塞主线程,让耗时的任务不影响对其他请求的响应。
在 python 3.2 上,增加了一个并行库 concurrent.futures,这个库提供了更简单的异步执行函数的方法。
如果是在 2.7 之类的 python 版本上,可以使用
关于这个库的具体使用,这里就不详细展开了,可以去看官方文档,需要注意的是,前两个例子是示例错误的用法,可能会产生死锁。
下面说说如何在 tornado 中结合使用 futures 库,最好的参考莫过于有文档+代码。正好, tornado 中解析 ip 使用的 dns 解析服务是多线程无阻塞的。(
我们来看看它的实现,看看如何应用到我们的程序中来。
从
再来看看
关于
简单的说,这里对传递进来的函数进行了封装,并用
对比官方的
所以 tornado 中解析 dns 使用的多线程无阻塞的方法的实质就是使用了
上面就是一个基本的使用方法,下面展示一个使用 sleep() 来模拟耗时的完整程序。
此时先调用 127.0.0.1:8888/sleep/10 不会阻塞 127.0.0.1:8888/ 了。
以上,就是完整的在 tornado 中利用多线程来执行耗时的任务。
当然也有其他的方法,比如使用 celery 来调度执行耗时太多的任务,比如频繁的需要写入数据到不同的文件中,我公司的一个项目中,需要把数据写入四千多个文件中,每天产生几亿条数据,就是使用了 tornado + redis + celery 的方法来高效的执行写文件任务。
完。
原文地址:在 tornado 中异步无阻塞的执行耗时任务, 感谢原作者分享。关键词:
在 linux 上 tornado 是基于 epoll 的事件驱动框架,在网络事件上是无阻塞的。但是因为 tornado 自身是单线程的,所以如果我们在某一个时刻执行了一个耗时的任务,那么就会阻塞在这里,无法响应其他的任务请求,这个和 tornado 的高性能服务器称号不符,所以我们要想办法把耗时的任务转换为不阻塞主线程,让耗时的任务不影响对其他请求的响应。
在 python 3.2 上,增加了一个并行库 concurrent.futures,这个库提供了更简单的异步执行函数的方法。
如果是在 2.7 之类的 python 版本上,可以使用
pip install futures来安装这个库。
关于这个库的具体使用,这里就不详细展开了,可以去看官方文档,需要注意的是,前两个例子是示例错误的用法,可能会产生死锁。
下面说说如何在 tornado 中结合使用 futures 库,最好的参考莫过于有文档+代码。正好, tornado 中解析 ip 使用的 dns 解析服务是多线程无阻塞的。(
netutils.ThreadedResolver)
我们来看看它的实现,看看如何应用到我们的程序中来。
tornado 中使用多线程无阻塞来处理 dns 请求
# 删除了注释 class ThreadedResolver(ExecutorResolver): _threadpool = None _threadpool_pid = None def initialize(self, io_loop=None, num_threads=10): threadpool = ThreadedResolver._create_threadpool(num_threads) super(ThreadedResolver, self).initialize( io_loop=io_loop, executor=threadpool, close_executor=False) @classmethod def _create_threadpool(cls, num_threads): pid = os.getpid() if cls._threadpool_pid != pid: # Threads cannot survive after a fork, so if our pid isn't what it # was when we created the pool then delete it. cls._threadpool = None if cls._threadpool is None: from concurrent.futures import ThreadPoolExecutor cls._threadpool = ThreadPoolExecutor(num_threads) cls._threadpool_pid = pid return cls._threadpool
ThreadedResolver是
ExecutorEesolver的子类,看看它的是实现。
class ExecutorResolver(Resolver): def initialize(self, io_loop=None, executor=None, close_executor=True): self.io_loop = io_loop or IOLoop.current() if executor is not None: self.executor = executor self.close_executor = close_executor else: self.executor = dummy_executor self.close_executor = False def close(self): if self.close_executor: self.executor.shutdown() self.executor = None @run_on_executor def resolve(self, host, port, family=socket.AF_UNSPEC): addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM) results = [] for family, socktype, proto, canonname, address in addrinfo: results.append((family, address)) return results
从
ExecutorResolver的实现可以看出来,它的关键参数是
ioloop和
executor,干活的
resolve函数被
@run_on_executor修饰,结合起来看
ThreadedResolver的实现,那么这里的
executor就是
from concurrent.futures import ThreadPoolExecutor
再来看看
@run_on_executor的实现。
run_on_executor的实现在
concurrent.py文件中,它的源码如下:
def run_on_executor(fn): @functools.wraps(fn) def wrapper(self, *args, **kwargs): callback = kwargs.pop("callback", None) future = self.executor.submit(fn, self, *args, **kwargs) if callback: self.io_loop.add_future(future, lambda future: callback(future.result())) return future return wrapper
关于
functions.wraps()的介绍可以参考官方文档 functools — Higher-order functions and operations on callable objects
简单的说,这里对传递进来的函数进行了封装,并用
self.executor.submit()对包装的函数进行了执行,并判断是否有回调,如果有,就加入到 ioloop 的 callback 里面。
对比官方的
concurrent.futures.Executor的接口,里面有个
submit()方法,从头至尾看看
ThreadedResolver的实现,就是使用了
concurrent.futures.ThreadPoolExecutor这个
Executor的子类。
所以 tornado 中解析 dns 使用的多线程无阻塞的方法的实质就是使用了
concurrent.futures提供的
ThreadPoolExecutor功能。
使用多线程无阻塞方法来执行耗时的任务
借鉴 tornado 的使用方法,在我们自己的程序中也使用这种方法来处理耗时的任务。from tornado.concurrent import run_on_executor from concurrent.futures import ThreadPoolExecutor class LongTimeTask(tornado.web.RequestHandler): executor = ThreadPoolExecutor(10) @run_on_executor() def get(self, data): long_time_task(data)
上面就是一个基本的使用方法,下面展示一个使用 sleep() 来模拟耗时的完整程序。
#!/usr/bin/env python #-*-coding:utf-8-*- import tornado.ioloop import tornado.web import tornado.httpserver from concurrent.futures import ThreadPoolExecutor from tornado.concurrent import run_on_executor import time class App(tornado.web.Application): def __init__(self): handlers = [ (r'/', IndexHandler), (r'/sleep/(\d+)', SleepHandler), ] settings = dict() tornado.web.Application.__init__(self, handlers, **settings) class BaseHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(10) class IndexHandler(tornado.web.RequestHandler): def get(self): self.write("Hello, world %s" % time.time()) class SleepHandler(BaseHandler): @run_on_executor def get(self, n): time.sleep(float(n)) self._callback() def _callback(self): self.write("after sleep, now I'm back %s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) if __name__ == "__main__": app = App() server = tornado.httpserver.HTTPServer(app, xheaders=True) server.listen(8888) tornado.ioloop.IOLoop.instance().start()
此时先调用 127.0.0.1:8888/sleep/10 不会阻塞 127.0.0.1:8888/ 了。
以上,就是完整的在 tornado 中利用多线程来执行耗时的任务。
结语
epoll 的好处确实很多,事件就绪通知后,上层任务函数执行任务,如果任务本身需要较耗时,那么就可以考虑这个方法了,当然也有其他的方法,比如使用 celery 来调度执行耗时太多的任务,比如频繁的需要写入数据到不同的文件中,我公司的一个项目中,需要把数据写入四千多个文件中,每天产生几亿条数据,就是使用了 tornado + redis + celery 的方法来高效的执行写文件任务。
完。
原文地址:在 tornado 中异步无阻塞的执行耗时任务, 感谢原作者分享。关键词:
相关文章推荐
- Fibonacci数
- Java自定义类数组的创建和使用
- HttpClient4.5教程-第三章-HTTP状态管理
- 在maven中安装oracle11g驱动
- BZOJ 1051 受欢迎的牛
- private,public,protected 的作用
- android onSaveInstanceState方法
- django no module splite3的问题
- PHP 对于 MYSQL 基础操作
- A. Again Twenty Five!
- 梳理
- 学习进度条
- C++ Primer Plus学习:第四章
- 各品牌机组装机重装系统按键汇总
- [Java] 算法学习之旅 [0] -- 求任意三位数各个数位上数字的和
- 工作感悟2016-3-11
- 第3周项目输出星号
- 包装类和工具类
- 随机生成四则运算2
- java property.js与jquery兼容修改property.js源文件