如何解决下载文件时 Apache 气流 ssh_hook 随机断开连接
我正在处理这个问题底部的代码。
我的目的是访问 SFTP 站点,检查新文件夹,然后下载 zip 文件(然后解压缩)。有时没有新文件夹,有时可能会有一个或多个新文件夹。
脚本看似正常运行,但 SFTP 连接随机断开,没有警告。我正在寻找一种方法来保持链接打开,直到所有文件都在。
import airflow
from airflow import models
from airflow.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.operators.bash_operator import BashOperator
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.operators.gcs import GoogleCloudStorageDeleteOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from airflow.hooks.ftp_hook import FTPHook
from airflow.utils import dates
from airflow.models import Variable
import logging
from zipfile import ZipFile
import os,glob,subprocess
args = {
'owner': 'Airflow','start_date': dates.days_ago(1),'email': ['sinistersparrow1701@gmail.com','rich@offrs.com'],'email_on_failure': True,'email_on_success': True,'schedule_interval': '0 1 * * *',}
def GetFiles(**kwargs):
foundfiles = False
FTPPATH = '/SmartZip/'
ftp = FTPHook(ftp_conn_id='FTP_DataTree_MLS')
ftp.get_conn()
folders = [x.split()[-1] for x in ftp.list_directory(FTPPATH,nlst=False) if x.startswith("d") and x.split()[-1].startswith("2")]
print(folders)
logging.info("NUMBER OF FOLDERS -> {}.".format(len(folders)))
if len(folders) > 0:
for folder in folders:
path = FTPPATH + folder
logging.info("looking in folder {}.".format(path))
if int(folder[0:8]) > int(Variable.get("mls_publish_date")):
Variable.set("mls_publish_date",int(folder[0:8]))
files = [x.split()[-1] for x in ftp.list_directory(path,nlst=False) if str(x.split()[-1]) != '.' and str(x.split()[-1]) != '..']
logging.info("number of files -> {}".format(len(files)))
if len(files) > 0:
foundfiles = True
for file in files:
localfile = Variable.get("temp_directory") + "MLS/" + file
logging.info("retrieving file {}/{}".format(path,file))
ftp.retrieve_file(path + '/' + file,localfile)
zf = ZipFile(localfile,'r')
zf.extractall(Variable.get("temp_directory") + "MLS/")
zf.close()
os.remove(localfile)
if foundfiles:
return 'Upload_Files_to_GCS'
else:
return 'Populate_Valuation_LeadsMLS'
dag = models.DAG(
dag_id='DataTree_MLS_Ingestion',default_args=args
)
check_for_file = BranchPythonOperator(
task_id='Check_FTP_and_Download',provide_context=True,python_callable=GetFiles,dag=dag
)
如何强制 SFTP 保持打开状态直到一切完成? 谢谢!
解决方法
如果繁忙的连接在操作过程中关闭,您几乎无能为力。
如果它发生了,您必须通过重新连接来优雅地处理它。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。