asyncio.CancelledError和“从未检索到_GatheringFuture异常”的怪异行为

如何解决asyncio.CancelledError和“从未检索到_GatheringFuture异常”的怪异行为

我正在观看import asyncio: Learn Python's AsyncIO #3 - Using Coroutines。讲师给出了以下示例:

import asyncio
import datetime

async def keep_printing(name):
    while True:
        print(name,end=" ")
        print(datetime.datetime.now())
        await asyncio.sleep(0.5)

async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),keep_printing("Second"),keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task,3)
    except asyncio.TimeoutError:
        print("Time's up!")


if __name__ == "__main__":
    asyncio.run(main())

输出有一个例外:

First 2020-08-11 14:53:12.079830
Second 2020-08-11 14:53:12.079830
Third 2020-08-11 14:53:12.080828 
First 2020-08-11 14:53:12.580865
Second 2020-08-11 14:53:12.580865
Third 2020-08-11 14:53:12.581901 
First 2020-08-11 14:53:13.081979
Second 2020-08-11 14:53:13.082408
Third 2020-08-11 14:53:13.082408 
First 2020-08-11 14:53:13.583497
Second 2020-08-11 14:53:13.583935
Third 2020-08-11 14:53:13.584946
First 2020-08-11 14:53:14.079666
Second 2020-08-11 14:53:14.081169
Third 2020-08-11 14:53:14.115689
First 2020-08-11 14:53:14.570694
Second 2020-08-11 14:53:14.571668
Third 2020-08-11 14:53:14.635769
First 2020-08-11 14:53:15.074124
Second 2020-08-11 14:53:15.074900
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

讲师试图通过在CancelledError中添加try/except来处理keep_printing

async def keep_printing(name):
    while True:
        print(name,end=" ")
        print(datetime.datetime.now())
        try:
            await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            print(name,"was cancelled!")
            break

但是,仍然发生相同的异常:

# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

然后,讲师只是继续学习其他主题,而从没有再回到这个示例来演示如何解决它。幸运的是,通过实验,我发现可以通过在try/except异步函数的except asyncio.TimeoutError:下添加另一个main来解决此问题:

async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),3)
    except asyncio.TimeoutError:
        print("Time's up!")
        try:
            await group_task
        except asyncio.CancelledError:
            print("Main was cancelled!")

最终输出是:

# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
Main was cancelled!

实际上,对于此版本的main,我们甚至不需要try...except asyncio.CancelledError中的keep_printing。仍然可以正常工作。

那是为什么?为什么在CancelledError中捕获main而不在keep_printing中捕获有效?视频讲师处理此异常的方式只会让我更加困惑。首先,他不需要更改keep_printing的任何代码!

解决方法

当aw由于超时而被取消时,wait_for等待aw被取消。如果在协程中处理CancelledError,则会收到超时错误。 在3.7版中进行了更改。
示例

import asyncio
import datetime

async def keep_printing(name):
    print(datetime.datetime.now())
    try:
        await asyncio.sleep(3600)
    except asyncio.exceptions.CancelledError:
        print("done")

async def main():
    try:
        await asyncio.wait_for(keep_printing("First"),timeout=3)
    except asyncio.exceptions.TimeoutError:
        print("timeouted")


if __name__ == "__main__":
    asyncio.run(main())

用于从Task或Future检索结果的collect方法,您有一个无限循环,并且永不返回任何结果。如果aws序列中的任何Task或Future被取消(wait_for发生了什么情况),则将其视为引发CancelledError的情况–在这种情况下不会取消collect()调用。这是为了防止取消一个已提交的任务/功能导致其他任务/功能被取消。
对于保护性聚集方法,您可以将其覆盖到屏蔽层上。

import asyncio
import datetime


async def keep_printing(name):
    while True:
        print(name,datetime.datetime.now())
        try:
            await asyncio.sleep(0.5)
        except asyncio.exceptions.CancelledError:
            print(f"canceled {name}")
            return None

async def main():
    group_task = asyncio.shield(asyncio.gather(
                     keep_printing("First"),keep_printing("Second"),keep_printing("Third"))
                    )
    try:
        await asyncio.wait_for(group_task,3)
    except asyncio.exceptions.TimeoutError:
        print("Done")


if __name__ == "__main__":
    asyncio.run(main())
,

让我们找出正在发生的事情:

  1. 此代码调度要执行的三个协程,并返回汇总结果的Future对象group_task(内部类_GatheringFuture的实例)。
group_task = asyncio.gather(
                     keep_printing("First"),keep_printing("Third")
                 )
  1. 此代码等待将来超时完成。而且,如果发生超时,它将取消并增加asyncio.TimeoutError
    try:
        await asyncio.wait_for(group_task,3)
    except asyncio.TimeoutError:
        print("Time's up!")
  1. 发生超时。让我们看一下异步库task.pywait_for执行以下操作:
timeout_handle = loop.call_later(timeout,_release_waiter,waiter)
...
await waiter
...
await _cancel_and_wait(fut,loop=loop)  # _GatheringFuture.cancel() inside
raise exceptions.TimeoutError()
  1. 当我们执行_GatheringFuture.cancel()时,如果实际上取消了任何子任务,则CancelledError会传播
class _GatheringFuture(futures.Future):
    ...
    def cancel(self):
        ...
        for child in self._children:
            if child.cancel():
                ret = True
        if ret:
            # If any child tasks were actually cancelled,we should
            # propagate the cancellation request regardless of
            # *return_exceptions* argument.  See issue 32684.
            self._cancel_requested = True
        return ret

后来

...
if outer._cancel_requested:
    # If gather is being cancelled we must propagate the
    # cancellation regardless of *return_exceptions* argument.
    # See issue 32684.
    outer.set_exception(exceptions.CancelledError())
else:
    outer.set_result(results)
  1. 因此,从收集future中提取结果或异常更为正确
async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task,3)
    except asyncio.TimeoutError:
        print("Time's up!")

    try:
        result = await group_task
    except asyncio.CancelledError:
        print("Gather was cancelled")
,

我认为您需要将 await 放在 asyncio.gather 之前。 所以这个调用来自您的代码:

    group_task = asyncio.gather(
                     keep_printing("First"),keep_printing("Third")
                 )

需要改成:

    group_task = await asyncio.gather(
                     keep_printing("First"),keep_printing("Third")
                 )

不知道为什么,我还在学习这些东西。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-