在 run() 中产生任务时 Luigi 中的 TaskClassAmbigiousException

如何解决在 run() 中产生任务时 Luigi 中的 TaskClassAmbigiousException

我在 Luigi 中遇到一个我不明白的错误。我不知道这是一个已知问题、Luigi 的限制还是我做错了什么。

我在一个实际问题中使用 Luigi,有许多任务和许多依赖。但是,我已经做了一个玩具示例,其中清楚地显示了这个问题。

让我们考虑两个任务,TaskA 和 TaskB,TaskA 需要使用不同的 Luigi 参数值执行 TaskB 的两个先前实例。

如果我在 TaksA 的 requires() 方法中编写依赖项,那么不会有什么不好的事情发生。所有三个任务都执行,我写了我的退出文件。

但是如果我在 TaksA 的 run() 方法中编写依赖项,那么我会得到丑陋的 TaskClassAmbigiousException。

在我的实际问题中,我无法在 requires() 方法中产生任务,因为我需要知道在 requieres() 方法中也产生的前一个任务的结果,因此我尝试在run() 并得到相同的异常。

好的,这是玩具示例的代码。首先,在 requieres() 中产生任务,它可以工作:

import luigi

class TaskB(luigi.Task):
    j = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputB{j}.txt".format(j=self.j))

    def requires(self):
        pass

    def run(self):
        print_file = 'TaskB' + str(self.j)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

class TaskA(luigi.Task):
    i = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputA{i}.txt".format(i=self.i))

    def requires(self):
        yield TaskB(j=self.i)
        yield TaskB(j=self.i+1)

    def run(self):
        print_file = ""
        for input_target in self.input():
            with input_target.open('r') as in_file:
                for line in in_file:
                    print_file+=line + 'TaskA' + str(self.i)

        with self.output().open('w') as out_file:
            out_file.write(print_file)
               
if __name__ == '__main__':
   taskA = TaskA(i=2)

其次,在 run() 中产生任务,我得到了这个:

 File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/worker.py",line 1081,in _handle_next_task
    for module,name,params in new_requirements]

  File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/worker.py",in <listcomp>
    for module,params in new_requirements]

  File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/task_register.py",line 251,in load_task
    task_cls = Register.get_task_cls(task_name)

  File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/task_register.py",line 181,in get_task_cls
    raise TaskClassAmbigiousException('Task %r is ambiguous' % name)

代码:

import luigi

class TaskB(luigi.Task):
    j = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputB{j}.txt".format(j=self.j))

    def requires(self):
        pass

    def run(self):
        print_file = 'TaskB' + str(self.j)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

class TaskA(luigi.Task):
    i = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputA{i}.txt".format(i=self.i))

    def requires(self):
        pass

    def run(self):
        print_file = ""
        target1 = yield TaskB(j=self.i)
        target2 = yield TaskB(j=self.i+1)
        for input_target in [target1,target2]:
            with input_target.open('r') as in_file:
                for line in in_file:
                    print_file+=line + 'TaskA' + str(self.i)

        with self.output().open('w') as out_file:
            out_file.write(print_file)
               
if __name__ == '__main__':
   taskA = TaskA(i=2)
   luigi.build([taskA],workers=1,local_scheduler=True,log_level='WARNING')

编辑:我编辑添加另一个相关问题。因为我想要做的是产生一个带有参数的任务,该参数依赖于先前产生的任务,如果这对我来说是可能的,那就足够了:

  def requires(self):
        taskb_target = yield TaskB(j=self.i)
        taskb_target.open('r')
# do something and yield next Task depending on what taskb_target has
        yield TaskB(j=self.i+1)

但不幸的是,这不起作用。 Luigi 说“NoneType”对象没有“open”属性。

但是,当您在 run() 方法中生成任务时,您可以在运行时访问输出。好像有很大的不对称……

第二次编辑:

我一直在做更多的试验,我发现了一个奇怪的结论:我在原始问题中编写的第二段代码(在 .py 文件中时)可以永远执行,即使删除输出文件和所以迫使 luigi 重新执行任务。但是,第一段代码只能执行一次(然后,在第一次执行时,它可以工作!!)。但是如果你删除文件并再次执行代码,你会得到模糊任务错误。

我认为这与luigi的Register对象有关。但真正让我困惑的是,无论我在 requieres 还是 run 方法中生成 taskB,这种行为都是不同的。

不知道是不是在重新定义luigi的Register模块中已经存在的类Task时出现问题。可能是...我还尝试将类定义放在与主 .py 不同的 .py 中,但是当运行两次时它会中断。正常运行的唯一方法就是重启内核,你只有一次机会!

解决方法

当您使用 yield 时,您不会获得返回值,因为您基本上是从协同程序中return获取一个值。我实际上很惊讶在 yield 内使用 requires 对您有用,因为它导致我崩溃。你要做的是先定义然后让出任务。因此,例如,您将:

class TaskA(luigi.Task):
    def run(self):
        task_1 = TaskB(j=self.i)
        yield task_1
        with task_1.output().open('r') as in_file:
            # Get data

        task_2 = TaskB(j=self.i+1,...)
        yield task_2
        ...
,

好的,现在我发现只有在执行两次脚本时才会出现问题。然后试了一下,发现问题是,再次导入TaskA和TaskB的时候,又在luigi.task_register.Register中重新注册了。

其实Register有一个属性_reg,里面包含了所有注册的类。并且在模块的第二次执行中,再次注册了TaskA和TaskB。我不知道为什么。这很奇怪,但确实如此。而且只有在导入TaskA时才会出现,更奇怪。

因此,我发现解决此问题的方法如下:

import luigi
from luigi import task_register

class TaskB(luigi.Task):
    j = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputB{j}.txt".format(j=self.j))

    def requires(self):
        pass

    def run(self):
        print_file = 'TaskB' + str(self.j)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

class TaskA(luigi.Task):
    i = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputA{i}.txt".format(i=self.i))

    def requires(self):
        pass

    def run(self):
        print_file = ""
        target1 = yield TaskB(j=self.i)
        target2 = yield TaskB(j=self.i+1)
        for input_target in [target1,target2]:
            with input_target.open('r') as in_file:
                for line in in_file:
                    print_file+=line + 'TaskA' + str(self.i)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

taskA_list = [c for c in task_register.Register._reg if c.__name__ == 'TaskA']
if len(taskA_list) > 1:
   task_register.Register._reg.pop()
   task_register.Register._reg.pop()
               
if __name__ == '__main__':
   taskA = TaskA(i=2)
   luigi.build([taskA],workers=1,local_scheduler=True,log_level='WARNING')

这是一个有效的技巧,可以使模块永久重新执行,无论是删除还是不删除输出文件。但显然不是最优雅的解决方案。我写它只是为了帮助 luigi 开发人员修复它,或者如果我做错了什么来纠正我!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-