如何解决如何测试on_failure_callback气流操作员
我有一种情况,我想对在Airflow DAG中通过on_failure_callback
调用的操作员进行集成测试。
此DAG的一个最小示例如下:
def failure_callback(context):
# CustomOperator in this case links to an external K8s service
handle_failure = CustomOperator(
task_id="handle_failure",timestamp=context["ts"]
)
handle_failure.execute(context=context)
args = {
"catchup": False,"retries": 3,"retry_delay": timedelta(seconds=30),"start_date": START_DATE,"on_failure_callback": failure_callback,}
with DAG("foo",schedule_interval=None,default_args=args) as dag:
task_to_fail = SomeOperator()
我的第一个测试想法是运行task_to_fail
,让它失败,并通过其他一些过程验证failure_callback
的结果,尝试以下操作:
import pytest
from airflow.models import DagBag,TaskInstance
from dateutil import parser
@pytest.fixture
def foo_dag():
dag_id = "foo"
dag_bag = DagBag("dags")
return dag_bag.dags[dag_id]
@pytest.mark.integration
def test_task_to_fail(foo_dag):
execution_date = parser.parse("2000-01-01T00:00+00:00")
task_id = "task_to_fail"
task = foo_dag.get_task(task_id=task_id)
task_instance = TaskInstance(task,execution_date)
with pytest.raises(Exception):
task_instance.run(ignore_task_deps=True,ignore_ti_state=True,test_mode=True)
assert "INSERT DESIRED OUTCOME OF `failure_callback` HERE"
我遇到的问题是,运行failure_callback
时似乎没有调用pytest
。我怀疑这是由于TaskInstance的调用方式(即未运行on_failure_callback
,但不确定。
我的问题:
- 这是验证此回调行为的正确方法吗?如果没有,应该如何处理?
- 在
task_to_fail
任务的上游,我想避免在测试期间运行许多昂贵的操作。是否有可能从特定任务(在这种情况下为pytest
开始,用task_to_fail
执行的完整DAG?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。