您的位置:首页 > 其它

tornado 中非阻塞 gen.coroutine 原理

2016-06-20 14:48 351 查看
异步代码编写,一般会通过回调函数的方式获取任务完成的消息,但是如果逻辑很复杂,回调嵌套就越深,代码也就越难理解,如下:

class AsyncHandler(RequestHandler):
@asynchronous
def get(self):
http_client = AsyncHTTPClient()
http_client.fetch('http://example1.com', callback=self.on_fetch)

def on_fetch1(self, response):
self.result1 = response
http_client.fetch('http://example2.com', callback=self.on_fetch)

def on_fetch2(self, response):
self.resutl2 = response
#...

修饰器
gen.coroutine
简化异步代码的编写,避免写回调函数,加快开发效率,提高代码可读性,通过结合 pyhton 的
yield
语句实现协程

from tornado import gen

@gen.coroutine
def fetch_coroutine():
http_client = AsyncHTTPClient()
self.result1 = yield http_client.fetch(url1)
self.result2 = yield http_client.fetch(url2)
self.result3 = yield http_client.fetch(url3)
#...

通过
@gen.coroutine
修饰的函数返回值变为
Future
, 在调用结束的时候会调用
Future.set_result
,这样就会调用与
Future
相关联的回调

fetch_coroutine
为一个生成器,
@gen.coroutine
内部会对生成器
fetch_coroutine
进行调用直到无返回值为止,
@gen.coroutine
由函数
_make_coroutine_wrapper
实现:

def _make_coroutine_wrapper(func, replace_callback):
if hasattr(types, 'coroutine'):
func = types.coroutine(func)

@functools.wraps(func)
def wrapper(*args, **kwargs):
future = TracebackFuture()

if replace_callback and 'callback' in kwargs:
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))

try:
# 调用实际的处理函数如果返回GeneratorType,那么进入Runner类循环执行知道终止
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = _value_from_stopiteration(e)
except Exception:
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, GeneratorType):
# Inline the first iteration of Runner.run.  This lets us
# avoid the cost of creating a Runner when the coroutine
# never actually yields, which in turn allows us to
# use "optional" coroutines in critical path code without
# performance penalty for the synchronous case.
try:
orig_stack_contexts = stack_context._state.contexts
# 返回一个Future,异步调用返回的Future
#  在异步调用成功后会将这个Future给set_done
yielded = next(result)

if stack_context._state.contexts is \
not orig_stack_contexts:
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future.set_result(_value_from_stopiteration(e))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)
try:
return future
finally:
future = None
future.set_result(result)
return future
return wrapper

下面介绍大致过程:

调用
func
即被包装函数
fetch_coroutine
,如果
func
返回值是一个生成器(python 中含有
yield
的函数为生成器,直接调用返回一个生成器),通过
next(result)
可以得到函数
fetch_coroutine
中第一次
yield
出来的值,这里是一个
Future
,最后经由Runner处理。外层函数
_make_coroutine_wrapper
会返回一个
Future
,用于通知异步回调完成,如果
func
非生成器,直接设置
Future
结果并返回.

注意上面的整个过程都是非阻塞,
Runner
中主要需要理解它两个函数
run
handle_yield
就可以明白大致原理,这个两个函数相互配合调用

run
Future
得到结果即
done
之后会调用,发送结果给
yield
语句,通知任务完成,在
fetch_coroutine
中我们得到一个
self.result1 = yield http_client.fetch(url1)
结果,如果生成器未结束即后面代码还有
yield
,在发送结果
self.gen.send
后会继续返回新的
Future
, 将其传递给
handle_yield


handle_yield
设置
Future
,将
run
与最新得到的
Future
关联,
handle_yield
中处理
yield
列表与字典情况,这里不做过多介绍

class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.yield_point = None
self.pending_callbacks = None
self.results = None
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
# For efficiency, we do not create a stack context until we
# reach a YieldPoint (stack contexts are required for the historical
# semantics of YieldPoints, but not for Futures).  When we have
# done so, this field will be set and must be called at the end
# of the coroutine.
self.stack_context_deactivate = None
if self.handle_yield(first_yielded):
self.run()
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None

try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()

if exc_info is not None:
yielded = self.gen.throw(*exc_info)
exc_info = None
else:
yielded = self.gen.send(value) # 发送异步调用结果,通知任务完成
# self.gen为生成器,即被@gen.coroutine修饰的函数,self.gen.send发送结果到 yield语句进行赋值
# 即 result = yield clien.fetch(url),这里的result即为value
# 生成器结束会得到StopIteration异常结束调用,否则继续调用self.handle_yield
if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
if self.pending_callbacks and not self.had_exception:
# If we ran cleanly without waiting on all callbacks
# raise an error (really more of a warning).  If we
# had an exception then some callbacks may have been
# orphaned, so skip the check in that case.
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
self.result_future.set_result(_value_from_stopiteration(e))
self.result_future = None
self._deactivate_stack_context()
return
except Exception:
self.finished = True
self.future = _null_future
self.result_future.set_exc_info(sys.exc_info())
self.result_future = None
self._deactivate_stack_context()
return
if not self.handle_yield(yielded):
return
finally:
self.running = False

def handle_yield(self, yielded):
# Lists containing YieldPoints require stack contexts;
# other lists are handled in convert_yielded.
# 处理 yield list or dict 情况
if _contains_yieldpoint(yielded):
yielded = multi(yielded)

if isinstance(yielded, YieldPoint):
# YieldPoints are too closely coupled to the Runner to go
# through the generic convert_yielded mechanism.
self.future = TracebackFuture()

def start_yield_point():
try:
yielded.start(self)
if yielded.is_ready():
self.future.set_result(
yielded.get_result())
else:
self.yield_point = yielded
except Exception:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())

if self.stack_context_deactivate is None:
# Start a stack context if this is the first
# YieldPoint we've seen.
with stack_context.ExceptionStackContext(
self.handle_exception) as deactivate:
self.stack_context_deactivate = deactivate

def cb():
start_yield_point()
self.run()
self.io_loop.add_callback(cb)
return False
else:
start_yield_point()
else:
try:
self.future = convert_yielded(yielded)
except BadYieldError:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())

if not self.future.done() or self.future is moment:
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
return True
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: