如何解决气流计划任务未按计划时间启动
在气流中使用ExternalSensor时遇到问题。排定为7:45和19:45的第一个dag在开始运行2次后仍未开始。
from datetime import timedelta,datetime
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
import pendulum
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
from dag_constants import LOCAL_TIMEZONE
local_tz = pendulum.timezone(LOCAL_TIMEZONE)
DEFAULT_ARGS = {
'owner': 'airflow','depends_on_past': False,'start_date': datetime(2020,8,27,tzinfo=local_tz),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 99,# 'end_date': datetime(2016,1,1),'wait_for_downstream': False,# 'dag': dag,# 'sla': timedelta(hours=2),# 'execution_timeout': timedelta(seconds=600),# 'on_failure_callback': some_function,# 'on_success_callback': some_other_function,# 'on_retry_callback': another_function,# 'sla_miss_callback': yet_another_function,# 'trigger_rule': 'all_success'
}
EVERYDAY_8AM_DAG = DAG(
'NEWBACK_DOWNLOADER_EVERY_7_45',default_args=DEFAULT_ARGS,description='KPC OB Shift KPI History DAG',schedule_interval='45 7 * * *',catchup=False
)
EVERYDAY_8PM_DAG = DAG(
'NEWBACK_DOWNLOADER_EVERY_19_45',schedule_interval='45 19 * * *',catchup=False
)
EVERYDAY_8AM_DAG.doc_md = __doc__
EVERYDAY_8PM_DAG.doc_md = __doc__
TEMPLATE_COMMAND = """
/usr/local/bin/python /root/pipeline/src/pipeline/pipeline_newback_shift_data.py
"""
T1 = BashOperator(
task_id='8AM_newback_downloader',depends_on_past=False,bash_command=TEMPLATE_COMMAND,dag=EVERYDAY_8AM_DAG,)
T2 = BashOperator(
task_id='8PM_newback_downloader',dag=EVERYDAY_8PM_DAG,)
T1.doc_md = """\
#### Task Documentation
Download newback history data at 7.45 AM
"""
T2.doc_md = """\
#### Task Documentation
Download newback history data at 7.45 PM
"""
然后是第二个依赖于此的
from datetime import timedelta,datetime
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# To set dependency to other task
from airflow.operators.sensors import ExternalTaskSensor
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
import pendulum
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
from dag_constants import LOCAL_TIMEZONE
local_tz = pendulum.timezone(LOCAL_TIMEZONE)
DEFAULT_ARGS = {
'owner': 'airflow','retries': 3,'retry_delay': timedelta(minutes=3),# 'priority_weight': 10,# 'wait_for_downstream': False,# 'trigger_rule': 'all_success'
}
EVERYDAY_8AM_DAG = DAG(
'OB_SHIFT_KPI_HISTORY_PIPELINE_EVERYDAY_8AM',description='KPC OB Shift KPI Pipeline DAG',catchup=False
)
EVERYDAY_8PM_DAG = DAG(
'OB_SHIFT_KPI_HISTORY_PIPELINE_EVERYDAY_8PM',catchup=False
)
EVERYDAY_8AM_DAG.doc_md = __doc__
EVERYDAY_8PM_DAG.doc_md = __doc__
TEMPLATE_COMMAND = """
cd /root/pipeline/cron_jobs/ && ./everyday_8_am_pm.sh
"""
T1 = BashOperator(
task_id='every_day_at_8_am_task',)
T2 = BashOperator(
task_id='every_day_at_8_pm_task',)
T1.doc_md = """\
#### Task Documentation
Run shift kpi history at 07.45 AM
"""
T2.doc_md = """\
#### Task Documentation
Run shift kpi history at 07.45 PM
"""
# Listen to NEWBACK_DOWNLOADER_EVERY_7_45
wait_for_newback_data_8am = ExternalTaskSensor(
task_id='wait_for_newback_data_8am',external_dag_id='NEWBACK_DOWNLOADER_EVERY_7_45',external_task_id='8AM_newback_downloader',start_date=datetime(2020,)
wait_for_newback_data_8pm = ExternalTaskSensor(
task_id='wait_for_newback_data_8pm',external_dag_id='NEWBACK_DOWNLOADER_EVERY_19_45',external_task_id='8PM_newback_downloader',)
# Run T1 after the NEWBACK_DOWNLOADER_EVERY_7_45 successfully run
wait_for_newback_data_8am >> T1
wait_for_newback_data_8pm >> T2
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。