如何解决Apache Airflow中的动态FTPSensor
我想实现一种动态FTPSensor。使用贡献的FTP传感器,我设法以这种方式工作:
ftp_sensor = FTPSensor(
task_id="detect-file-on-ftp",path="./data/test.txt",ftp_conn_id="ftp_default",poke_interval=5,dag=dag,)
,效果很好。但是我需要传递动态路径和ftp_conn_id参数。即我在上一个任务中生成了一堆新连接,并且在ftp_sensor任务中,如果FTP上存在文件,我想检查以前生成的每个新连接。
所以我首先想到了从XCom那里获取连接ID的方法。 我从XCom中的上一个任务发送了它们,但是看来我无法在任务之外访问XCom。 例如。我的目标是:
active_ftp_connections = context['ti'].xcom_pull(key='active_ftps')
for conn in active_ftp_connections:
ftp_sensor = FTPSensor(
task_id="detect-file-on-ftp",path=conn['path'],ftp_conn_id=conn['connection'],)
但这似乎不是解决方案。
然后,我只是浪费大量时间尝试创建自己的FTPSensor,将需要的数据动态传递给它,但是现在我得出的结论是,我需要传感器和操作员之间的混合,因为我需要例如保留戳功能,但也具有执行功能。 我猜一个选择是编写一个自定义操作符,该操作符可实现传感器基类中的戳,但现在可能太累了,无法尝试这样做。
您是否知道如何实现我的目标?我似乎在互联网上找不到有关该主题的任何材料-也许只有我一个。 如果问题不清楚,请告诉我,以便我提供更多详细信息。
更新
我现在认为有可能
def get_active_ftps(**context):
active_ftp_connestions = context['ti'].xcom_pull(key='active_ftps')
return active_ftp_connestions
for ftp in get_active_ftps():
ftp_sensor = FTPSensor(
task_id="detect-file-on-ftp",path="./"+ ftp['folder'] +"/test.txt",ftp_conn_id=ftp['conn_id'],)
但会引发错误:Broken DAG: [/usr/local/airflow/dags/copy_file_from_ftp.py] 'ti'
解决方法
我设法做到了:
active_ftp_folder = Variable.get('active_ftp_folder')
active_ftp_conn_id = Variable.get('active_ftp_conn_id')
ftp_sensor = FTPSensor(
task_id="detect-file-on-ftp",path="./"+ active_ftp_folder +"/test.txt",ftp_conn_id=active_ftp_conn_id,poke_interval=5,dag=dag,)
并且因为我意识到直接无环图中不应该存在循环,所以dag一次只能运行一个ftp帐户。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。