如何解决AIRFLOW-JINJA未在DAG中渲染
我正在创建一个传递任意数量的运算符和参数并创建DAG的函数,我在渲染jinja时遇到问题,我在相关的运算符中更新了template_fields
,但是没有人遇到过运用这个吗?
def date_offset(active_day_interval,dag,opr_array=[]
):
with dag:
for i in range(active_day_interval):
run_check = CheckOperator(
task_id=f'{dag.dag_id}_run_check_{i}',day_offset=i,dag=dag
)
params = {
"date": "{{ ds }}"
}
task_list = []
for opr in opr_array:
new_id = opr['class_args']['task_id'] + f"_{i}"
op_params = opr['class_args'].copy()
op_params.update({
"task_id": new_id,"day_offset": i,"dag": dag,"xcom_loc": new_id,"params": params
})
operator = opr['class_name']
task_operator = operator(**op_params)
task_list.append(task_operator)
chain(run_check,*task_list)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。