您的位置:首页 > 编程语言 > Python开发

pyuv+python35的超简化异步编程方案,参考asyncio

2016-01-15 18:12 519 查看
pyuv安装请戳这里

实现原理类似参考



测试代码:

import pyuv
import sys
import functools
import time

class Futear:

_PENDING = 0
_CANCELLED = 1
_FINISHED = 2

def __init__(self, callback=None):
self._result_args = []
self._result_kwargs = {}
self._state = Futear._PENDING
self._callback = callback
self._catch_coro()

def set_callback(self, cb):
self._callback = cb

def _catch_coro(self):
self._coro = Task._current_coro

def _schedule_callback(self):
if self._callback is not None:
self._callback(self)

def _trigger_coro(self):
if self._coro is not None:
prev_coro, Task._current_coro = Task._current_coro, self._coro
try:
self._coro.send(None)
except StopIteration as si:
pass
finally:
Task._current_task = prev_coro

def cancel(self):
self._state = Futear._CANCELLED

def set_result(self, *args):
self._result_args = args
self._state = Futear._FINISHED
self._trigger_coro()
self._schedule_callback()

def result(self):
if self._result_kwargs:
return (*self._result_args, self._result_kwargs)
else:
return (*self._result_args, )

def is_pending(self):
return self._state == Futear._PENDING

def is_done(self):
return self._state == Futear._FINISHED

def is_running(self):
return self._state == Futear._PENDING

def __iter__(self, *args):
if self.is_running():
return self
return self.result()  # May raise too.

if sys.version_info >= (3, 5):
__await__ = __iter__ # make compatible with 'await' expression

pass

class Task(Futear):

#_tasks = []
_current_coro = None

def __init__(self, coro, callback=None, timeout=None):
super().__init__(callback)
Task.add_task(self)
self._state = None
#self._loop = loop
if coro is not None:
self._coro = coro

self._start(timeout)

def _start(self, timeout=None):
self._trigger_coro();

@classmethod
def add_task(cls, task):
#cls._tasks.append(task)
pass

async def wait(*coros):
res = []
if coros is not None:
for coro in coros:
res.append(await coro)
return res

async def parallel(*coros):
_waiter = Futear()
_waiter.__count = 0
_waiter.__res = [0 for x in coros]
async def task_package(coro, id, waiter):
try:
r = await coro
except Exception as e:
r = e
finally:
waiter.__res[id] = r
waiter.__count += 1
if waiter.__count >= len(coros):
waiter.set_result(*waiter.__res);
pass
pass

for id, coro in enumerate(coros):
t = Task(task_package(coro, id, _waiter))

return await _waiter

loop = pyuv.Loop()

async def sleep(sec):

timer = pyuv.Timer(loop)
waiter = Futear()

def callback(result, waiter, timer_handle):
timer_handle.stop()
waiter.set_result(result)

cb = functools.partial(callback, "abc", waiter)
timer.start(cb, sec, sec)
r = await waiter;
return r

async def slow_operation(n):
await sleep(1)
print("Slow operation {} complete".format("%d:%s" % (n, slow_operation.__name__)))
return 1

async def async_tasks():
start = time.time()
res = await parallel(
slow_operation(1),
slow_operation(2),
slow_operation(3),
slow_operation(4),
slow_operation(5),
slow_operation(6),
)
end = time.time()
print('Complete in {} second(s)'.format(end-start))

async def sync_tasks():
start = time.time()
res = await wait(
slow_operation(4),
slow_operation(5),
slow_operation(6),
)
end = time.time()
print('Complete in {} second(s)'.format(end-start))

if __name__ == "__main__":

print("before creating task.")
start = time.time()
t = Task(async_tasks())
t = Task(sync_tasks())
end = time.time()
print("after creating task.")
print('Complete in {} second(s)'.format(end-start))

loop.run()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: