如何解决Apache airflow-无法对从baseOperator继承的队列名称进行模板化
我有一个Custom运算符,它继承了class CheckBoxInListView extends StatefulWidget {
@override
_CheckBoxInListViewState createState() => _CheckBoxInListViewState();
}
class _CheckBoxInListViewState extends State<CheckBoxInListView> {
final List<SimpleModel> _items = <SimpleModel>[
SimpleModel('InduceSmile.com',false),SimpleModel('Flutter.io',SimpleModel('google.com',SimpleModel('youtube.com',SimpleModel('yahoo.com',SimpleModel('gmail.com',];
@override
Widget build(BuildContext context) => ListView(
padding: const EdgeInsets.all(8),children: _items
.map(
(SimpleModel item) => CheckboxListTile(
title: Text(item.title),value: item.isChecked,onChanged: (bool val) {
setState(() => item.isChecked = val);
},),)
.toList(),);
}
class SimpleModel {
String title;
bool isChecked;
SimpleModel(this.title,this.isChecked);
}
。我正在尝试对“队列”名称进行模板化,以便可以由其他Celery工作人员来接任务。
但是它使用原始模板字符串(未渲染的jinja字符串)作为队列名称,而不是渲染的字符串。
如果我直接将预期的队列名称作为简单字符串给出,则相同的流程也有效。
baseoperator
在上述情况下:
- 在RabbitMQ中,我看到任务正在队列名称'{{dag_run.conf [“ queue”]}}''(原始模板字符串)下排队
- 在气流中,
from airflow import DAG from operators.check_operator import CheckQueueOperator from datetime import datetime,timedelta from airflow.operators.python_operator import BranchPythonOperator from airflow.utils.dates import days_ago default_args = { 'schedule_interval': None,# exclusively “externally triggered” DAG 'owner': 'admin','description': 'This helps to quickly check queue templatization','start_date': days_ago(1),'retries': 0,'retry_delay': timedelta(minutes=5),'provide_context': True } # this goes to wrong queue --> {{ dag_run.conf["queue"]}} with DAG('test_queue',default_args=default_args) as dag: t1 = CheckQueueOperator(task_id='check_q',queue='{{ dag_run.conf["queue"]}}' )
下的Rendered template
字段可以看到正确渲染的值 - 在屏幕截图中,我们看到
queue
作为队列名称。这是我的测试队列,也是我的默认气流队列。如果我将此队列名称指定为直接字符串,则效果很好。
docker-desktop
CheckQueueOperator代码:
#this goes to right queue --> my_target_queue
with DAG('test_queue',queue='my_target_queue'
)
堆栈详细信息:
- Apache Airflow版本-1.10.12
- 使用CeleryExecutor
- 使用RabbitMQ
解决方法
queue
保留了BaseOperator
属性(也许不是正式的,但实际上是),尽管您可能可以通过网络服务器来渲染该属性,在读取queue
属性之前,处理调度和任务执行的Airflow部分不执行渲染。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。