tornado.gen 模块解析
2015-10-30 13:09
344 查看
转自:http://strawhatfy.github.io/2015/07/22/Tornado.gen/
引言
注:正文中引用的 Tornado 代码除特别说明外,都默认引用自 Tornado 4.0.1。tornado.gen 模块是一个基于 python generator 实现的异步编程接口。通过该模块提供的 coroutine (注:这里 coroutine 指的是 ”协程” 概念而不是后面具体实现的 decorator:@gen.decorator),大大简化了在 Tornado 中编写异步代码的工作 —— 支持 “同步方式编写异步代码” ,避免编写烦人的回调函数。参考官方文档的例子,通常我们编写的异步代码如下:
1 2 3 4 5 6 7 8 9 10 | class AsyncHandler(RequestHandler): @asynchronous def get(self): http_client = AsyncHTTPClient() http_client.fetch("http://example.com", callback=self.on_fetch) def on_fetch(self, response): do_something_with_response(response) self.render("template.html") |
1 2 3 4 5 6 7 8 | class GenAsyncHandler(RequestHandler): @asynchronous @gen.engine def get(self): http_client = AsyncHTTPClient() response = yield http_client.fetch("http://example.com") do_something_with_response(response) self.render("template.html") |
1 2 3 4 5 6 7 | class GenAsyncHandler(RequestHandler): @gen.coroutine def get(self): http_client = AsyncHTTPClient() response = yield http_client.fetch("http://example.com") do_something_with_response(response) self.render("template.html") |
很显然,采用同步方式编写的异步代码相比起分散在各处的异步回调函数代码,更利于代码的阅读和逻辑的组织。
该模块的实现非常巧妙也不容易理解,作为阅读 Tonardo 源码的笔记,我将在后面内容中结合源码和自己的理解对其实现进行分析。
@gen.coroutine 与 @gen.engine 的实现原理
tornado.gen 支持以同步方式编写异步代码的核心就是 python generator。其原理简单来说,就是通过 generator.next() 启动 yield 返回的 generator ,通过 IOLoop 与 generator.send(value) 驱动 generator 运行,以达到协调异步执行的目的。从功能上来看, @gen.coroutine 与 @gen.engine 的功能非常相似,差别就在于二者对被装饰方法参数中的 “callback” 参数处理不一样以及具有不同的返回值。 @gen.coroutine 装饰的方法执行后返回 Future 对象并且会将方法参数中的 “callback” 加入到 Future 完成后的回调列表中;@gen.engine 装饰的方法执行后没有返回值(注:实际上如果被装饰方法有返回值,会抛出 ReturnValueIgnoredError 异常,详见后面的代码分析部分)。
所以,通过 @gen.engine 装饰的方法没有返回值,方法必须自己在异步调用完成后调用 “callback” 来执行回调动作,而通过 @gen.coroutine 装饰的方法则可以直接返回执行结果,然后由 gen 模块负责将结果传递给 “callback” 来执行回调。
注: 从调用者的角度来看
@gen.coroutine可以视为
@tornado.concurrent.return_future与
@gen.engine的组合。
@gen.coroutine 实现原理
@gen.coroutine 中充分利用了 generator 的特性,下面是其实现代码及分析。
1 2 3 | def coroutine(func, replace_callback=True): """Decorator for asynchronous generators.""" return _make_coroutine_wrapper(func, replace_callback=True) |
_make_coroutine_wrapper(func, replace_callback) 函数作为 @gen.coroutine 和 @gen.engine 内部实现,通过 replace_callback 的值来决定是否对 “callback” 方法参数进行处理。coroutine 的实现中通过 replace_callback=True 调用 _make_coroutine_wrapper 函数,会检查方法参数中是否有 “callback” 参数,如果有的话会将其加入到方法返回值 Future 的完成后回调列表中。如下面代码所示:
1 2 3 4 5 6 7 8 9 1011 | def _make_coroutine_wrapper(func, replace_callback): @functools.wraps(func) def wrapper(*args, **kwargs): future = TracebackFuture() # 处理 “callback”,忽略或者将其加入到 Future 的完成回调列表中。 if replace_callback and 'callback' in kwargs: callback = kwargs.pop('callback') IOLoop.current().add_future( future, lambda future: callback(future.result())) try: result = func(*args, **kwargs) except (Return, StopIteration) as e: # 在 python 2 以及 python 3.3 以前,generator 中不能直接通过 # return 返回值:return 被视为 raise StopIteration(), # return <something> 被视为raise StopIteration(<something>)。 # 在 gen 模块中,特别定义了 Return 类型用于返回值:raise gen.Return(something>) result = getattr(e, 'value', None) except Exception: # 发生异常,异常被写入 future(将会被设置为完成状态),结束调用,返回 future future.set_exc_info(sys.exc_info()) return future else: if isinstance(result, types.GeneratorType): # 通过检查 result 是否为 GeneratorType 来选择是否创建 coroutine ,对于 # 同步情况直接 future.set_result(result) 返回,避免创建 coroutine 而 # 造成的性能损失。 # 与 Tornado 4.0 之前的版本比较,这里已经把顶层 ExceptionStackContext # 的构建以及 Runner.run 的功能进行了重构,都迁移到了 Runner 实现中。 # # 通过 next 启动 generator ,启动前记录上下文,启动后对上下文进行一致性检查。 # 若 generator 中有从 "with StackContext" 直接 “yield” 的代码逻辑,将抛 # 出 StackContextInconsistentError 异常。 try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) if stack_context._state.contexts is not orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: future.set_result(getattr(e, 'value', None)) except Exception: future.set_exc_info(sys.exc_info()) else: Runner(result, future, yielded) try: return future finally: # Subtle memory optimization: if next() raised an exception, # the future's exc_info contains a traceback which # includes this stack frame. This creates a cycle, # which will be collected at the next full GC but has # been shown to greatly increase memory usage of # benchmarks (relative to the refcount-based scheme # used in the absence of cycles). We can avoid the # cycle by clearing the local variable after we return it. # # 代码注释中说,generator.next() 抛出异常失败后, future 的 exc_info # 中会包含当前栈帧的引用,栈帧中也有对 future 的引用,这样导致一个环,必须 # 要在下一次 full GC 时才能回收内存。返回 future 后将 future 设置为 None # 可以优化内存。(注:需要 full GC 是与 python 的垃圾回收实现采用引用计数 # 为主,标记-清除和分代机制为辅相关。python 采用引用计数来立刻释放可以释放 # 的内存,然后用标记-清除的方法来清除循环引用的不可达对象。) future = None # 同步情况下,不需要创建 coroutine,直接返回 future。 future.set_result(result) return future return wrapper class Return(Exception): def __init__(self, value=None): super(Return, self).__init__() self.value = value |
如下面的代码所示, IOLoop 的 add_future 方法会封装回调方法,在 Future 完成以后会将 “callback” 加入到 IOLoop 的回调列表中以等待 IOLoop 调度执行回调动作。
1 2 3 4 5 6 7 8 9 1011 | def add_future(self, future, callback): """Schedules a callback on the ``IOLoop`` when the given `.Future` is finished. The callback is invoked with one argument, the `.Future`. """ assert is_future(future) callback = stack_context.wrap(callback) # 在 future 的完成回调列表中增加一个 lambda 表达式,负责在 # 将 “callback” 加入 IOLoop 调度执行。 future.add_done_callback( lambda future: self.add_callback(callback, future)) |
_make_coroutine_wrapper函数已经完成了 coroutine 的创建,其代码逻辑比较简单,而整个 coroutine 启动、运行的核心功能被实现在
Runner类中。
Runner有一个
run()方法,该方法负责启动 coroutine,并与 IOLoop 配合驱动 YieldPoint(注:在 generator 中通过 yield 返回的实例类型,Tornado 4.0 及以后推荐使用 Futures 类型, YieldPoint 类型被放弃) 执行直到 result_future 完成。
run()方法的详细代码如下所示:
1 2 3 4 5 6 7 8 9 1011 | def run(self): """Starts or resumes the generator, running until it reaches a yield point that is not ready. """ if self.running or self.finished: return try: self.running = True while True: future = self.future # 当前 future 没有完成时直接返回,等待 IOLoop 在 future 完成后回调再执行 if not future.done(): return # 当前 future 完成后对 coroutine 接下来运行没作用,立即释放 self.future = None try: orig_stack_contexts = stack_context._state.contexts try: value = future.result() except Exception: self.had_exception = True yielded = self.gen.throw(*sys.exc_info()) else: # 将 future 的结果赋值给当前 yield 表达式,驱动 generator 继续 # 执行, (如果generator未结束的话)返回下一个 yield 表达式结果 yielded = self.gen.send(value) if stack_context._state.contexts is not orig_stack_contexts: self.gen.throw( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: # generator 执行完成,将 执行结果赋值给 result_future,返回 self.finished = True self.future = _null_future # Tornado 4.0 之前使用 YieldPoint 驱动,Callback 与 Wait/WaitAll # 协调时,Callback 的回调结果需要 Runner 作为中转站,通过 # Runner.register_callback(key) 登记 Callback ,再通过 # YieldPoint.result_callback(key) 取回“设置(回调)方法”, # 外部通过“设置(回调)方法”把结果保存到 Runner.results 字典中。 # Wait/WaitAll 通过 get_result(key) 取回 结果。 # YieldFuture 的实现也采用了相同的实现方式。 # Tornado 4.0 之后使用 Future 代替 YieldPoint,这些已经过时。 # 与 Yield 相关的代码都是为了向后兼容。 if self.pending_callbacks and not self.had_exception: # If we ran cleanly without waiting on all callbacks # raise an error (really more of a warning). If we # had an exception then some callbacks may have been # orphaned, so skip the check in that case. raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) self.result_future.set_result(getattr(e, 'value', None)) self.result_future = None self._deactivate_stack_context() return except Exception: self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None self._deactivate_stack_context() return # 继续处理 yield 表达式结果 if not self.handle_yield(yielded): return finally: self.running = False def handle_yield(self, yielded): # 为了保持向后兼容,需要对多个 YieldPonit 和 Future 的混合集合做处理。 # 对于全是 Future 的集合类型使用新的 multi_future 函数进行封装处理; # 不全是的使用 Multi 类进行封装,对于 Future 提供了 YieldFuture 适配器类。 # 详细的实现细节见 YieldFuture、Multi的实现代码。 # 若需要 run() 循环立即处理该 YieldPoint(被启动)/Future(已经完成) 则返 # 回 True,否则返回 False。 if isinstance(yielded, list): if all(is_future(f) for f in yielded): yielded = multi_future(yielded) else: yielded = Multi(yielded) elif isinstance(yielded, dict): if all(is_future(f) for f in yielded.values()): yielded = multi_future(yielded) else: yielded = Multi(yielded) # 针对第一个 YieldPoint 使用一个 ExceptionStackContext 上下文来处理 # StackContexts 中没有处理的异常,将未处理的异常记录到 result_future 中。 # 对于 Future 对象则没有必要, Future 提供了方法来记录异常和异常堆栈信息, # 在 Future 完成后通过其 result() 方法获取结果(在 run 方法的调用)时会 # 再次抛出异常,这时可捕获记录到 result_future 中。 if isinstance(yielded, YieldPoint): self.future = TracebackFuture() def start_yield_point(): try: yielded.start(self) # 如果 yielded 已经完成,则将其结果赋值给 self.future,等待 run 循环处理; # 若未就绪,则需要通过 Runner.set_result(key, value) 来进行赋值操作。 if yielded.is_ready(): self.future.set_result( yielded.get_result()) else: self.yield_point = yielded except Exception: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if self.stack_context_deactivate is None: # Start a stack context if this is the first # YieldPoint we've seen. with stack_context.ExceptionStackContext( self.handle_exception) as deactivate: self.stack_context_deactivate = deactivate def cb(): start_yield_point() self.run() # 第 1 个 yielded 交由 IOLoop来启动 self.io_loop.add_callback(cb) return False else: # 启动 YieldPoint,需要返回 True,在 run 循环中继续处理 start_yield_point() elif is_future(yielded): self.future = yielded # self.future 完成后继续 self.run() # moment = Future() 是一个特殊的对象,主要用在需要长时间执行的 coroutine 中, # 通过 “yield gen.moment” 中断当前 coroutine ,将控制权交给 IOLoop 去轮询。 # 等效于当前 coroutine 临时放弃时间片,给了其他 callback 机会运行。 if not self.future.done() or self.future is moment: self.io_loop.add_future( self.future, lambda f: self.run()) return False else: self.future = TracebackFuture() self.future.set_exception(BadYieldError( "yielded unknown object %r" % (yielded,))) return True |
1 2 3 4 5 6 7 8 9 1011 | def __init__(self, gen, result_future, first_yielded): self.gen = genreturn_futurereturn_future self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() # For efficiency, we do not create a stack context until we # reach a YieldPoint (stack contexts are required for the historical # semantics of YieldPoints, but not for Futures). When we have # done so, this field will be set and must be called at the end # of the coroutine. self.stack_context_deactivate = None if self.handle_yield(first_yielded): self.run() |
@gen.engine 实现原理
前面内容已经说过
@gen.engine与
@gen.coroutine是非常相似的,对使用者而言
@gen.coroutine就是
@concurrent.return_future和
@gen.engine的组合(详见 concurrent.return_future 的实现)。下面是
@gen.engine的实现代码:
1 2 3 4 5 6 7 8 9 1011 | def engine(func): # 不对被装饰方法的 "callback" 参数做替换处理,也就是说即使被装饰方法有 “callback” 参数, # 在 coroutine 执行完成得到结果以后不会“自动调用”该 “callback”。细节将 # _make_coroutine_wrapper 实现代码。 func = _make_coroutine_wrapper(func, replace_callback=False) @functools.wraps(func) def wrapper(*args, **kwargs): # 获取 coroutine 的执行结果,由于 coroutine 执行完成后不会自动调用 "callback" ,所 # 以要求被装饰方法不能有返回值(非 None)而必须自己调用 "callback",否则抛出 # ReturnValueIgnoredError 异常。 future = func(*args, **kwargs) def final_callback(future): if future.result() is not None: raise ReturnValueIgnoredError( "@gen.engine functions cannot return values: %r" % (future.result(),)) future.add_done_callback(final_callback) return wrapper |
把普通的异步方法适配到 coroutine
通过上面的分析可以看到,Tornado 实现的 coroutine 支持编写 “同步方式的异步代码”,但是要求异步调用的返回值是 Future 或者 YieldPoint 实例,对于这样的异步方法,我们只需要简单的使用 yield 表达式便可以轻松将其转换为 coroutine。而对于返回值为 None,仅支持通过 “callback” 参数回调(异步结果执行的结果会作为 “callback” 调用的实参)的普通异步方法,便不能直接被 Tornado 的 coroutine 支持,需要我们自己做一些额外的封装工作。tornado.gen 模块提供了一个标准的封装函数 Task(注:Tornado 4.0 以前 Task 是作为 YieldPoint 的子类来实现的,之后改为返回 Future 实例的函数,为了向后兼容,所以是一个拥有“类名”的函数。)。Task 的实现原理很简单,因为这种普通异步方法没有返回值而是通过把异步结果作为回调函数的实参来达到传递的目的,所以 Task 就将这种方法包装成返回值为 Future 的方法然后通过方法的回调函数来把异步结果传递给返回的 Future 实例。下面是 Task 的实现代码:
1 2 3 4 5 6 7 8 9 1011 | def Task(func, *args, **kwargs): future = Future() def handle_exception(typ, value, tb): if future.done(): return False future.set_exc_info((typ, value, tb)) return True # 提供给 func 的回调函数,将 func 的异步结果传递给 future。 # 注意: 这个回调函数仅支持一个 “参数”。 def set_result(result): if future.done(): return future.set_result(result) # 由于 func 的 “callback” 的形参个数是不确定的(或者说 func 回调 “callback” # 的形参个数不确定),要适配 set_result 就需要将形参包装成一个对象传递给 # set_result。 _argument_adapter 函数就是负责完成这个封装功能的,对于 0 个或 1 # 个实参调用的情况,_argument_adapter 不做任何处理直接将该参数传递给 set_result, # 对于多个实参的情况,这将参数包装成 Arguments(namedtuple) 传递给 set_result。 # 参数的 “callback”, with stack_context.ExceptionStackContext(handle_exception): func(*args, callback=_argument_adapter(set_result), **kwargs) return future Arguments = collections.namedtuple('Arguments', ['args', 'kwargs']) def _argument_adapter(callback): """Returns a function that when invoked runs ``callback`` with one arg. If the function returned by this function is called with exactly one argument, that argument is passed to ``callback``. Otherwise the args tuple and kwargs dict are wrapped in an `Arguments` object. """ def wrapper(*args, **kwargs): if kwargs or len(args) > 1: callback(Arguments(args, kwargs)) elif args: callback(args[0]) else: callback(None) return wrapper |
结束语
python generator 是个功能强大的利器(PEP 342 加入了新的特性,能让生成器在单一语句中实现,生成一个值或者接受一个值,或同时生成一个值并接受一个值。),是 tornado.gen 模块实现 coroutine 的基石。虽然 tornado.gen 的核心就是 generator,但是其整个设计和实现都非常巧妙,并且随着 Tornado 版本的演变该模块也在不断重构和优化,对比其不同版本的实现演进,对于我们理解学习都非常有价值,值得反复研读。相关文章推荐
- Java根据两点的经纬度来计算之间的距离
- Oracle round()函数与trunc()函数区别介绍
- object_getClass
- 关于ios中的点赞控件效果的实现--UIControl
- spark数据挖掘 - 基于 Audioscrobbler 数据集音乐推荐实战
- DRBD原理及特性概述
- oracle恢复备份数据
- 多子系统项目架构与人员管理
- 题解
- Python 基础——Python程序员常犯的那些错误
- 玩转Visual Studio之开篇介绍
- google搜索镜像
- Eclipse中Vi插件Vrapper的安装
- 林业通用平台上线
- C#实现下载功能,可用于软件自动更新
- 产品经理的三种特质
- 持久化存储系统本版号
- 半监督学习研究
- Session的缓存
- poj 1751 Highways 最小生成树 prim