如何解决如何获得气流中dag运行的最新执行时间
我尝试了以下代码,但仍然遇到问题
from airflow.models DagModel
def get_latest_execution_date(**kwargs):
session = airflow.settings.Session()
f = open("/home/Insurance/InsuranceDagsTimestamp.txt","w+")
try:
Insurance_last_dag_run = session.query(DagModel)
for Insdgrun in Insurance_last_dag_run:
if Insdgrun is None:
f.write(Insdgrun.dag_id+",9999-12-31"+"\n")
else:
f.write(Insdgrun.dag_id+","+ Insdgrun.execution_date+"\n")
except:
session.rollback()
finally:
session.close()
t1 = PythonOperator(
task_id='records',provide_context=True,python_callable=get_latest_execution_date,dag=dag)
有什么方法可以修复并获取最新的dag运行时信息
解决方法
PythonOperator
op_args
参数是模板化的。
可调用对象仅将最新执行日期写入文件,因此您可以通过以下方式实现该功能:
def store_last_execution_date(execution_date):
'''Appends latest execution date to a file
:param execution_date: The last execution date of the DagRun.
'''
with open("/home/Insurance/InsuranceDagsTimestamp.txt","w+") as f:
f.write(execution_date)
t1 = PythonOperator(
task_id="records",provide_context=True,python_callable=store_last_execution_date,op_args=[
"{{dag.get_latest_execution_date()}}",],dag=dag
)
,
有多种方法可以获取DagRun的最新执行。一种方法是利用Airflow DagRun模型。
from airflow.models import DagRun
def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date,reverse=True)
return dag_runs[0] if dag_runs else None
dag_run = get_most_recent_dag_run('fake-dag-id-001')
if dag_run:
print(f'The most recent DagRun was executed at: {dag_run.execution_date}')
您可以在Airflow Docs located here中找到有关DagRun模型及其属性的更多信息。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。