如何解决Luigi Workflows 中的条件分支
我是 Luigi 的新手,并试图在我的流程中创建一个条件分支。分支任务会评估一个条件,并根据结果跳过它的一些子任务。
为了测试这一点,我只有一个虚拟任务,它检查当前小时,如果流程在早上执行,则返回 True,否则返回 False。此任务有两个子任务,一个在控制台中打印“Morning”,另一个打印“Afternoon”。根据分支任务的结果,一个被激活,另一个被跳过。这是在 Prefect 中的样子:
您可以在这里看到流程是在上午执行的,因此跳过了下午的任务。
经过一番研究,我不知道路易吉是否有能力做这样的事情。到目前为止我尝试过的是:
# Third Task
class Branch(luigi.Task):
def requires(self):
return Sleep()
def output(self):
return luigi.LocalTarget('condition.txt')
def run(self):
date = str(datetime.now())
hour = int (date.split()[1].split(':')[0])
with self.output().open('w') as out:
if hour<12:
out.write('morning')
open("afternoon.txt","w").close() # create afternoon Target skips afternoon task
else:
out.write('afternoon')
open("morning.txt","w").close() # create morning Target skips afternoon task
# Fourth Task depends on result of branch
class Morning(luigi.Task):
def requires(self):
return Branch()
def output(self):
return luigi.LocalTarget('morning.txt')
def run(self):
print ("Morning")
with self.output().open('w') as out:
out.write("Morning")
class Afternoon(luigi.Task):
def requires(self):
return Branch()
def output(self):
return luigi.LocalTarget('afternoon.txt')
def run(self):
print ("Afternoon")
with self.output().open('w') as out:
out.write("Afternoon")
# Fifth task is a merge after the branch
class Merge(luigi.WrapperTask):
def requires(self):
yield Morning()
yield Afternoon()
def run(self):
print ("Merged")
据我所知,Luigi 只会在其输出尚不存在的情况下执行任务。我当时的想法是创建任务的输出文件来跳过以防止它执行,但它不起作用。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。