如何解决定义Airflow DAG任务顺序的正确方法
我在DAG中有相当长的一组任务,每个任务都有相当长的task_id
,细节都是相关的,并且命名不能缩短。
目前,我将其编写为:
a_very_long_long_named_task_1 >> a_very_long_long_named_task_2 >> a_very_long_long_named_task_3 >> a_very_long_long_named_task_4 >> a_very_long_long_named_task_5
在其他DAG中,尽管重复,但我将其分为多行:
a_very_long_long_named_task_1 >> a_very_long_long_named_task_2
a_very_long_long_named_task_2 >> a_very_long_long_named_task_3
a_very_long_long_named_task_3 >> a_very_long_long_named_task_4
a_very_long_long_named_task_4 >> a_very_long_long_named_task_5
推荐哪个?是否有最佳实践,或者是定义任务顺序的另一种更好的方法?
解决方法
- 创建(实例化)任务时,您可以继续将任务添加到
python
list
(或dict
/类似)中 - 然后最后可以以编程方式连接它们
请注意,该代码段未经测试
from typing import List
from airflow.models.baseoperator import BaseOperator
my_tasks: List[BaseOperator] = [
a_very_long_long_named_task_1,a_very_long_long_named_task_2,a_very_long_long_named_task_3,a_very_long_long_named_task_4,a_very_long_long_named_task_5
]
..
# define a utility method to set dependencies b/w tasks
def wire_tasks(my_tasks: List[BaseOperator]) -> None:
"""
A utility method that accepts a list of tasks and links them up
:param my_tasks: List of tasks (operator instances)
:type my_tasks: List[BaseOperator]
:return None
"""
for i in range(1,len(my_tasks)):
# this is equivalent to my_tasks[i - 1].set_upstream(my_tasks[i])
my_tasks[i - 1] >> my_tasks[i]
# call the utility method to wire the tasks
wire_tasks(my_tasks=my_tasks)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。