深入理解 Python 协程
协程
import asyncio
import time
# 协程函数返回的是一个协程对象
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
# 等待1s后再等待2s,不能做到并行运行
await say_after(1, 'hello') # await 等待协程运行结束
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
# asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数
asyncio.run(main())
asyncio.create_task()
函数用来运行作为 asyncio
任务的多个协程。
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# 等待两个 task 完成,并行运行,只需要 2s.
await task1
await task2
print(f"finished at {time.strftime('%X')}")
可等待对象
如果一个对象可以在 await
语句中使用,那么它就是 可等待 对象。
可等待对象有三种主要类型: 协程, 任务 和 Future.
协程
协程函数: 定义形式为 async def
的函数;
协程对象: 调用 协程函数 所返回的对象。
import asyncio
# 协程函数
async def nested():
return 42
async def main():
# 调用协程函数返回的是协程对象(coroutine object),不能运行
nested()
# 协程通过 await 可等待的方式运行
print(await nested())
asyncio.run(main())
任务
任务: 被用来 “并行的” 调度协程
当一个协程通过 asyncio.create_task(coro,*,name=None)
等函数被封装为一个任务,该协程会被 自动调度 执行:
该任务会在 get_running_loop()
返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。
task.add_done_callback(func)
设置任务完成的回调函数
import asyncio
async def nested():
return 42
async def main():
# 将协程封装成一个任务
task = asyncio.create_task(nested())
# 等待直到它完成
await task
asyncio.run(main())
create_task
会把可等待对象被封装为一个任务,该协程会被 自动调度 执行,所以不用 await
import asyncio
async def wait(times: int):
print(f"等待{times}s")
await asyncio.sleep(times)
print("等待结束!")
async def main():
# asyncio.wait_for(wait(3), 2) 不 await 就出错: RuntimeWarning: coroutine 'wait_for' was never awaited
# create_task 会把可等待对象被封装为一个任务,该协程会被 **自动调度** 执行,
# 所以不用 await 也能执行, await 了就表示等待这个任务执行完成!
asyncio.create_task(asyncio.sleep(5))
await asyncio.create_task(wait(100)) # 可用于堵塞事件循环,不退出
if __name__ == "__main__":
asyncio.run(main())
Futures
Future
是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。
通常情况下没有必要在应用层级的代码中创建 Future
对象。
运行 asyncio 协程
asyncio.run(coro, *, debug=False)
执行 coroutine coro
并返回结果。
此函数会运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。
当有其他 asyncio
事件循环在同一线程中运行时,此函数不能被调用。
如果 debug 为 True,事件循环将以调试模式运行。
此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
并发运行任务
asyncio.gather(*aws, return_exceptions=False)
并发运行 aws 序列中的可等待对象。
如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务(asyncio.create_task)调度。
如果 return_exceptions
为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。
如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。
import asyncio
async def factorial(name, number):
"""计算 number! 阶乘并返回
example: 3!=1*2*3
"""
# 测试如果出现异常
if name == "error":
print(f"任务名:{name} 出现异常!")
raise EOFError
f = 1
for i in range(2, number + 1):
print(f"任务名:{name}:计算阶乘({number}), i={i}...")
await asyncio.sleep(1) # await 使协程等待,让出给其他协程使用
f *= i
print(f"任务名{name}: {number}! = {f}")
return f
async def main():
# gather 一起执行可等待对象,并按调用顺序返回
# gather 会阻塞直到 gather 中的所有可等待对象完成
print("async start!")
L = await asyncio.gather(
factorial("error", None),
factorial("A", 2),
factorial("B", 3),
factorial("C", 4), return_exceptions=False
)
# 当 return_exceptions 为 False 时引发的首个异常会传播给 gather 可等待对象列表的任务
# 整个程序停摆,未执行完的可等待对象也会取消
# 当 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。
# 并不会引发整个程序的异常
print(f"async result:{L}")
print("async end....")
asyncio.run(main())
如果 gather 本身被取消,则无论 return_exceptions
取值为何,消息都会被传播。
if name == "error":
print(f"任务名:{name} 出现异常!")
gather_waiting_object.cancel() # 取消 gather
# 错误: asyncio.exceptions.CancelledError
超时
协程(可等待对象) asyncio.wait_for(aw, timeout)
等待 aw 可等待对象 完成,指定 timeout
秒数后超时。
如果 aw 是一个协程,它将自动被作为任务((asyncio.create_task))调度。timeout
可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。
如果发生超时,任务将取消并引发 asyncio.TimeoutError.
要避免任务 取消,可以加上 shield()。
此函数将等待直到 Future 确实被取消,所以总等待时间可能超过 timeout。 如果在取消期间发生了异常,异常将会被传播。
如果等待被取消,则 aw 指定的对象也会被取消。
简单等待
coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
并发地运行 aws 可迭代对象中的 可等待对象
(不能直接传入协程对象需要转换为 Task) 并进入阻塞状态直到满足 return_when 所指定的条件。
return_when 指定此函数应在何时返回:
- FIRST_COMPLETED: 在第一个可等待对象运行完毕后返回
- FIRST_EXCEPTION: 在任意可等待对象抛出异常后返回,不会结束其他可等待对象,并在程序结束最后抛出异常
- ALL_COMPLETED(默认): 在所有可等待对象执行完毕后返回
aws 可迭代对象必须不为空。 返回两个 Task/Future 集合: (done, pending)
(done, panding)
: done:已完成的协程;panding:正在等待的协程;支持通过 if
判断
import asyncio
async def wait(name, times: int):
if name == "A":
raise EOFError
print(f"{name}等待{times}s")
await asyncio.sleep(times)
print(f"{name}等待结束!")
return times
async def main():
task, pending = await asyncio.wait(
(
wait("A", 3),
wait("B", 2),
wait("C", 1)
), return_when=asyncio.FIRST_COMPLETED
)
print("已完成:", task, "\n等待:", pending)
await asyncio.sleep(5)
print("抛出异常")
if __name__ == "__main__":
asyncio.run(main())
(和男朋友出去喝奶茶了,未完待续,可能会继续写一些应用场景,或者和 Golang 的 Goruntine 进行对比)…
2022/8/24 18:34
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。