如何解决在Python中,如何测试进行asyncio.ensure_future...调用的函数?
在python单元测试中,如果使用asyncio.ensure_future(...)调用了该函数,如何断言该函数已被调用?我有一个失败的简单测试:
async def test_assert_ensure_future_called():
async def increment():
increment.call_count += 1
increment.call_count = 0
asyncio.ensure_future(increment())
asyncio.ensure_future(increment())
asyncio.ensure_future(increment())
# await asyncio.sleep(0) # this allows the asyncio.ensure_future calls to run
assert increment.call_count == 3 # this fails because the calls to increment() haven't happened yet
我发现,如果在assert语句之前插入一个await asyncio.sleep(0)
之类的调用,则测试将成功。我认为这是可行的,因为它取消了测试任务的优先级,并让增量调用首先进行。这是测试中正确的方法吗?
解决方法
保存这些任务的引用,并等待每个任务调用await
或将asyncio.gather
用于一行。
import asyncio
async def test_assert_ensure_future_called():
async def increment():
increment.call_count += 1
increment.call_count = 0
tasks = [asyncio.ensure_future(increment()) for _ in range(3)]
await asyncio.gather(*tasks)
assert increment.call_count == 3
asyncio.run(test_assert_ensure_future_called())
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。