如何解决由于连接到Airflow DB的DAG代码,Apache Airflow在initdb上冻结
它成功创建了airflow initdb
表,但冻结了步骤:
INFO [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f,increase_length_for_connection_password
Alembic升级冻结,因为我的DAG中包含此代码
session = settings.Session()
conns: Iterable[Connection] = (
session.query(Connection.conn_id)
.filter(and_(
Connection.conn_id.ilike(f'{CONN_PREFIX}%'),Connection.conn_type == CONN_TYPE,))
.all()
return [conn.conn_id for conn in conns]
我使用它来基于具有特殊前缀的Airflow Connections快速创建任务。
但是Airflow在initdb
命令中运行DAGs代码。
因此,我的代码锁定表connection
和Alembic升级脚本无法更改它并冻结。
死锁。
据我了解,我必须以某种方式释放DAG代码中的锁定。 重新开启交易? 该怎么做?
解决方法
好,当我终于了解了问题的时候,解决方案就很简单
from typing import Iterable
from airflow import settings
from airflow.models import Connection
from sqlalchemy import and_
CONN_TYPE = 'fs'
CONN_PREFIX = 'my_special_conn_'
session = settings.Session()
try:
conns: Iterable[Connection] = (
session.query(Connection.conn_id)
.filter(and_(
Connection.conn_id.ilike(f'{CONN_PREFIX}%'),Connection.conn_type == CONN_TYPE,))
.all()
)
conn_ids = [conn.conn_id for conn in conns]
finally:
session.commit()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。