如何解决Python-取消异步任务?
我在下面为异步池编写了代码。在__aexit__
中,我要在任务完成后取消_worker任务。但是,当我运行代码时,工作任务并没有被取消,并且代码永远运行。任务如下所示:<Task pending coro=<AsyncPool._worker() running at \async_pool.py:17> wait_for=<Future cancelled>>
。 asyncio.wait_for
被取消,但工作任务未取消。
class AsyncPool:
def __init__(self,coroutine,no_of_workers,timeout):
self._loop = asyncio.get_event_loop()
self._queue = asyncio.Queue()
self._no_of_workers = no_of_workers
self._coroutine = coroutine
self._timeout = timeout
self._workers = None
async def _worker(self):
while True:
try:
ret = False
queue_item = await self._queue.get()
ret = True
result = await asyncio.wait_for(self._coroutine(queue_item),timeout = self._timeout,loop= self._loop)
except Exception as e:
print(e)
finally:
if ret:
self._queue.task_done()
async def push_to_queue(self,item):
self._queue.put_nowait(item)
async def __aenter__(self):
assert self._workers == None
self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]
return self
async def __aexit__(self,type,value,traceback):
await self._queue.join()
for worker in self._workers:
worker.cancel()
await asyncio.gather(*self._workers,loop=self._loop,return_exceptions =True)
要使用Asyncpool,
async def something(item):
print("got",item)
await asyncio.sleep(item)
async def main():
async with AsyncPool(something,5,2) as pool:
for i in range(10):
await pool.push_to_queue(i)
asyncio.run(main())
解决方法
问题在于您的except Exception
异常子句还会捕获取消,并忽略它。更令人困惑的是,print(e)
仅在出现CancelledError
时打印空行,这是输出中的空行来自的地方。 (将其更改为print(type(e))
可以显示发生了什么。)
要解决此问题,请将except Exception
更改为更具体的内容,例如except asyncio.TimeoutError
。在Python 3.8中,不需要进行此更改,在Python 3.8中,asyncio.CancelledError
不再源自Exception
,而是源自BaseException
,因此except Exception
不能抓住它。
创建asyncio
任务然后将其取消后,您仍然有需要“回收”的任务。因此,您想要await worker
。但是,一旦您await
取消了这样的任务,因为它将永远不会给您返回期望的返回值,则asyncio.CancelledError
将被提升,您需要将其捕获在某个地方。
由于这种行为,我认为您不应该gather
将它们await
} async def __aexit__(self,type,value,traceback):
await self._queue.join()
for worker in self._workers:
worker.cancel()
for worker in self._workers:
try:
await worker
except asyncio.CancelledError:
print("worker cancelled:",worker)
,因为它们应该立即返回:
{{1}}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。