tornado 中非阻塞 gen.coroutine 原理
2016-06-20 14:48
351 查看
异步代码编写,一般会通过回调函数的方式获取任务完成的消息,但是如果逻辑很复杂,回调嵌套就越深,代码也就越难理解,如下:
修饰器
通过
下面介绍大致过程:
调用
注意上面的整个过程都是非阻塞,
class AsyncHandler(RequestHandler): @asynchronous def get(self): http_client = AsyncHTTPClient() http_client.fetch('http://example1.com', callback=self.on_fetch) def on_fetch1(self, response): self.result1 = response http_client.fetch('http://example2.com', callback=self.on_fetch) def on_fetch2(self, response): self.resutl2 = response #...
修饰器
gen.coroutine简化异步代码的编写,避免写回调函数,加快开发效率,提高代码可读性,通过结合 pyhton 的
yield语句实现协程
from tornado import gen @gen.coroutine def fetch_coroutine(): http_client = AsyncHTTPClient() self.result1 = yield http_client.fetch(url1) self.result2 = yield http_client.fetch(url2) self.result3 = yield http_client.fetch(url3) #...
通过
@gen.coroutine修饰的函数返回值变为
Future, 在调用结束的时候会调用
Future.set_result,这样就会调用与
Future相关联的回调
fetch_coroutine为一个生成器,
@gen.coroutine内部会对生成器
fetch_coroutine进行调用直到无返回值为止,
@gen.coroutine由函数
_make_coroutine_wrapper实现:
def _make_coroutine_wrapper(func, replace_callback): if hasattr(types, 'coroutine'): func = types.coroutine(func) @functools.wraps(func) def wrapper(*args, **kwargs): future = TracebackFuture() if replace_callback and 'callback' in kwargs: callback = kwargs.pop('callback') IOLoop.current().add_future( future, lambda future: callback(future.result())) try: # 调用实际的处理函数如果返回GeneratorType,那么进入Runner类循环执行知道终止 result = func(*args, **kwargs) except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: future.set_exc_info(sys.exc_info()) return future else: if isinstance(result, GeneratorType): # Inline the first iteration of Runner.run. This lets us # avoid the cost of creating a Runner when the coroutine # never actually yields, which in turn allows us to # use "optional" coroutines in critical path code without # performance penalty for the synchronous case. try: orig_stack_contexts = stack_context._state.contexts # 返回一个Future,异步调用返回的Future # 在异步调用成功后会将这个Future给set_done yielded = next(result) if stack_context._state.contexts is \ not orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: future.set_exc_info(sys.exc_info()) else: Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper
下面介绍大致过程:
调用
func即被包装函数
fetch_coroutine,如果
func返回值是一个生成器(python 中含有
yield的函数为生成器,直接调用返回一个生成器),通过
next(result)可以得到函数
fetch_coroutine中第一次
yield出来的值,这里是一个
Future,最后经由Runner处理。外层函数
_make_coroutine_wrapper会返回一个
Future,用于通知异步回调完成,如果
func非生成器,直接设置
Future结果并返回.
注意上面的整个过程都是非阻塞,
Runner中主要需要理解它两个函数
run和
handle_yield就可以明白大致原理,这个两个函数相互配合调用
run在
Future得到结果即
done之后会调用,发送结果给
yield语句,通知任务完成,在
fetch_coroutine中我们得到一个
self.result1 = yield http_client.fetch(url1)结果,如果生成器未结束即后面代码还有
yield,在发送结果
self.gen.send后会继续返回新的
Future, 将其传递给
handle_yield
handle_yield设置
Future,将
run与最新得到的
Future关联,
handle_yield中处理
yield列表与字典情况,这里不做过多介绍
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() # For efficiency, we do not create a stack context until we # reach a YieldPoint (stack contexts are required for the historical # semantics of YieldPoints, but not for Futures). When we have # done so, this field will be set and must be called at the end # of the coroutine. self.stack_context_deactivate = None if self.handle_yield(first_yielded): self.run() def run(self): """Starts or resumes the generator, running until it reaches a yield point that is not ready. """ if self.running or self.finished: return try: self.running = True while True: future = self.future if not future.done(): return self.future = None try: orig_stack_contexts = stack_context._state.contexts exc_info = None try: value = future.result() except Exception: self.had_exception = True exc_info = sys.exc_info() if exc_info is not None: yielded = self.gen.throw(*exc_info) exc_info = None else: yielded = self.gen.send(value) # 发送异步调用结果,通知任务完成 # self.gen为生成器,即被@gen.coroutine修饰的函数,self.gen.send发送结果到 yield语句进行赋值 # 即 result = yield clien.fetch(url),这里的result即为value # 生成器结束会得到StopIteration异常结束调用,否则继续调用self.handle_yield if stack_context._state.contexts is not orig_stack_contexts: self.gen.throw( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: self.finished = True self.future = _null_future if self.pending_callbacks and not self.had_exception: # If we ran cleanly without waiting on all callbacks # raise an error (really more of a warning). If we # had an exception then some callbacks may have been # orphaned, so skip the check in that case. raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) self.result_future.set_result(_value_from_stopiteration(e)) self.result_future = None self._deactivate_stack_context() return except Exception: self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None self._deactivate_stack_context() return if not self.handle_yield(yielded): return finally: self.running = False def handle_yield(self, yielded): # Lists containing YieldPoints require stack contexts; # other lists are handled in convert_yielded. # 处理 yield list or dict 情况 if _contains_yieldpoint(yielded): yielded = multi(yielded) if isinstance(yielded, YieldPoint): # YieldPoints are too closely coupled to the Runner to go # through the generic convert_yielded mechanism. self.future = TracebackFuture() def start_yield_point(): try: yielded.start(self) if yielded.is_ready(): self.future.set_result( yielded.get_result()) else: self.yield_point = yielded except Exception: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if self.stack_context_deactivate is None: # Start a stack context if this is the first # YieldPoint we've seen. with stack_context.ExceptionStackContext( self.handle_exception) as deactivate: self.stack_context_deactivate = deactivate def cb(): start_yield_point() self.run() self.io_loop.add_callback(cb) return False else: start_yield_point() else: try: self.future = convert_yielded(yielded) except BadYieldError: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if not self.future.done() or self.future is moment: self.io_loop.add_future( self.future, lambda f: self.run()) return False return True
相关文章推荐
- weui
- 广东海洋大学 电子1151 孔yanfei python语言程序设计 第四周
- 技术小黑屋
- CSS Display(显示) 与 Visibility(可见性)
- -source 1.5 中不支持 diamond 运算符
- 关系型数据库和NoSQL的对比表格
- C# 基础·算法篇
- 注意了,中小卖家如何30天内打造爆款?
- matlab之prod() rem() true() flipdim()函数\shading
- Android照片墙完整版,完美结合LruCache和DiskLruCache
- Photoshop制作漂亮立体的胶卷icon图标
- 安全性测试AppScan工具使用实战
- leetcode_354 Russian Doll Envelopes
- LinkedList其实就那么一回事儿之源码分析
- python pdb调试
- 个人界面 < 头像 > 图片选择(相册,拍照)--如何调用系统的相册,裁剪并且上传
- 多线程异步机制Handler以及AsyncTask
- Android之SurfaceView学习(一)
- jQuery实现右下角可缩放大小的层完整实例
- 将数组中的元素循环左移 p 个元素