如何解决Airflow Scheduler未执行计划的作业,并且未生成日志
环境:
- RHEL 8.2 x86_64
- 气流1.10.10
- PostgreSQL 12.3
- Python 3.6 设置:
- Airflow以用户“ svc_etl”的身份运行,该用户具有通过组和用户访问Airflow主文件夹,DAG和日志文件夹的权限
- Windows Samba共享上的DAG文件夹位置(链接的文件夹)
- Windows Samba共享上的任务日志文件夹位置
- Postgres和Airflow作为服务(systemctl)在同一台服务器(VM)上运行
- 服务服务器上的Airflow home文件夹 此设置是由于预算限制所致。
问题: DAG上的计划不会执行,并且不会以失败状态登录而结束。在某些情况下,在DAG的树视图中使用选项“清除”会成功执行第一个任务。对Clear的任何进一步使用都会导致执行后续任务。 这些任务可以是完整的模块,也可以是单独的功能。 Airflow Scheduler日志中没有错误,任务本身也没有日志。
采取的步骤:
- 停止Airflow调度程序,从文件夹和数据库中删除所有DAG及其日志。重新启动Airflow Scheduler,将DAG复制回文件夹并等待。 DAG重新出现在GUI和数据库中。预定任务仍将在GUI和数据库中显示为失败。清除错误会导致第一个任务成功执行,然后进一步清除后,第一个任务和后续任务将成功执行(并写入任务日志)。
- 手动执行可以导致(取决于DAG定义)相同的行为。
- DAG在另一台仅顺序执行的Airflow测试服务器上成功运行。两台服务器的Log和DAG文件夹位于不同的位置,并且彼此分开。
问题:
- 如何找出Airflow / Postgres的哪个组件会产生这种行为?
气流配置:
[core]
# The folder where your airflow pipelines live,most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /ABTEILUNG/DWH/airflow/dags
# The folder where airflow should store its log files
# This path must be absolute
# > writing logs to the GIT main folder is a very bad idea and needs to change soon!!!
base_log_folder = /ABTEILUNG/DWH/airflow/logs
# Airflow can store logs remotely in AWS S3,Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true,see UPDATING.md for additional
# configuration requirements.
remote_logging = False
remote_log_conn_id =
remote_base_log_folder =
encrypt_s3_logs = False
# Logging level
logging_level = INFO
fab_logging_level = WARN
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =
# Log format
# Colour the logs when the controlling terminal is a TTY.
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
# Log filename format
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /home/airflow/logs/dag_processor_manager/dag_processor_manager.log
# Hostname by providing a path to a callable,which will resolve the hostname
# The format is "package:function". For example,# default value "socket:getfqdn" means that result from getfqdn() of "socket" package will be used as hostname
# No argument should be required in the function specified.
# If using IP address as hostname is preferred,use value "airflow.utils.net:get_host_ip_address"
hostname_callable = socket:getfqdn
# Default timezone in case supplied date times are naive
# can be utc (default),system,or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = Europe/Berlin
# The executor class that airflow should use. Choices include
# SequentialExecutor,LocalExecutor,CeleryExecutor,DaskExecutor,KubernetesExecutor
#Default:
#executor = SequentialExecutor
executor = LocalExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine,more information
# their website
#sql_alchemy_conn = sqlite:////home/airflow/airflow.db
#! this needs to change to MySQL or Postgres database
sql_alchemy_conn = postgresql+psycopg2://localhost:5436/airflow_db?user=svc_etl&password=<some-hard-password>
# The encoding for the databases
sql_engine_encoding = utf-8
# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True
# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 5
# The maximum overflow size of the pool.
# When the number of checked-out connections reaches the size set in pool_size,# additional connections will be returned up to this limit.
# When those additional connections are returned to the pool,they are disconnected and discarded.
# It follows then that the total number of simultaneous connections the pool will allow is pool_size + max_overflow,# and the total number of "sleeping" connections the pool will allow is pool_size.
# max_overflow can be set to -1 to indicate no overflow limit;
# no limit will be placed on the total number of concurrent connections. Defaults to 10.
sql_alchemy_max_overflow = 10
# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite. If the number of DB connections is ever exceeded,# a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 1800
# Check connection at the start of each connection pool checkout.
# Typically,this is a simple statement like “SELECT 1”.
# More information here: https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
sql_alchemy_pool_pre_ping = True
# The schema to use for the metadata database
# SqlAlchemy supports databases with the concept of multiple schemas.
sql_alchemy_schema =
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
# > actually this parameter is more like max_active_tasks per Airflow state database across all processing servers
# > Default original: parallelism = 32
parallelism = 6
# The number of task instances allowed to run concurrently by the schedule
# > again more like max_active_tasks_for_worker (process)
# > Default original: dag_concurrency = 16
dag_concurrency = 4
# Are DAGs paused by default at creation
dags_are_paused_at_creation = True
# The maximum number of active DAG runs per DAG
# > Default original: max_active_runs_per_dag = 16
max_active_runs_per_dag = 4
# Whether to load the examples that ship with Airflow. It's good to
# get started,but you probably want to set this to False in a production
# environment
load_examples = False
# Where your Airflow plugins are stored
plugins_folder = /home/airflow/plugins
# Secret key to save connection passwords in the db
fernet_key =
#! this needs to be set to enable obfuscated password shown in GUI
# Whether to disable pickling dags
donot_pickle = False
# How long before timing out a python file import
dagbag_import_timeout = 30
# How long before timing out a DagFileProcessor,which processes a dag file
dag_file_processor_timeout = 120
# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner
# If set,tasks without a `run_as_user` argument will be run with this user
# Can be used to de-elevate a sudo user running Airflow when executing tasks
default_impersonation =
# What security module to use (for example kerberos):
security =
# If set to False enables some unsecure features like Charts and Ad Hoc Queries.
# In 2.0 will default to True.
secure_mode = False
# Turn unit test mode on (overwrites many configuration options with test
# values at runtime)
unit_test_mode = False
# Name of handler to read task instance logs.
# Default to use task handler.
task_log_reader = task
# Whether to enable pickling for xcom (note that this is insecure and allows for
# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False).
enable_xcom_pickling = True
# When a task is killed forcefully,this is the amount of time in seconds that
# it has to cleanup after it is sent a SIGTERM,before it is SIGKILLED
killed_task_cleanup_time = 60
# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`,the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params = False
# Worker initialisation check to validate Metadata Database connection
worker_precheck = False
# When discovering DAGs,ignore any files that don't contain the strings `DAG` and `airflow`.
dag_discovery_safe_mode = True
# The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0
[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly,while the json_client will use the api running on the
# webserver
api_client = airflow.api.client.local_client
# If you set web_server_url_prefix,do NOT forget to append it here,ex:
# endpoint_url = http://localhost:8080/myroot
# So api will look like: http://localhost:8080/myroot/api/experimental/...
endpoint_url = http://localhost:8080
[api]
# How to authenticate users of the API
auth_backend = airflow.api.auth.backend.default
[lineage]
# what lineage backend to use
backend =
[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =
[operators]
# The default owner assigned to each new operator,unless
# provided explicitly or passed via `default_args`
default_owner = airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
[hive]
...
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080
# The ip specified when starting the web server
web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8080
...
# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
#Default:web_server_master_timeout = 120
web_server_master_timeout = 300
# Number of seconds the gunicorn webserver waits before timing out on a worker
#Default:web_server_worker_timeout = 120
web_server_worker_timeout = 300
# Number of workers to refresh at a time. When set to 0,worker refresh is
# disabled. When nonzero,airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1
# Number of seconds to wait before refreshing a batch of workers.
#Default:worker_refresh_interval = 30
worker_refresh_interval = 60
# Secret key used to run your flask app
secret_key = temporary_key
# Number of workers to run the Gunicorn web server
#Default:workers = 4
workers = 2
# The worker class gunicorn should use. Choices include
# sync (default),eventlet,gevent
worker_class = sync
# Log files for the gunicorn webserver. '-' means log to stderr.
#Default:access_logfile = -
#Default:error_logfile = -
access_logile = -
error_logfile = /home/airflow/gunicorn.err
...
# Default DAG view. Valid values are:
# tree,graph,duration,gantt,landing_times
dag_default_view = tree
# Default DAG orientation. Valid values are:
# LR (Left->Right),TB (Top->Bottom),RL (Right->Left),BT (Bottom->Top)
dag_orientation = LR
# Puts the webserver in demonstration mode; blurs the names of Operators for
# privacy.
demo_mode = False
# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
#Default:log_fetch_timeout_sec = 5
log_fetch_timeout_sec = 30
# By default,the webserver shows paused DAGs. Flip this to hide paused
# DAGs by default
hide_paused_dags_by_default = False
# Consistent page size across all listing views in the UI
page_size = 100
...
# Define the color of navigation bar
#Default:navbar_color = #007A87
navbar_color = #1C33C7
# Default dagrun to show in UI
default_dag_run_display_number = 25
...
# Default setting for wrap toggle on DAG code and TI log views.
default_wrap = False
...
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries,failure,and you want to use
# the airflow.utils.email.send_email_smtp function,you have to configure an
# smtp server here
smtp_host = <valid-SMTP-server-connection>
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user =
# smtp_password =
smtp_port = 25
smtp_mail_from = <valid-domain-address@domain>
[sentry]
...
[celery]
...
[dask]
...
[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI),this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
# Default original: scheduler_heartbeat_sec = 5
scheduler_heartbeat_sec = 20
# after how much time should the scheduler terminate in seconds
# -1 indicates to run continuously (see also num_runs)
run_duration = -1
# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1
# The number of seconds to wait between consecutive DAG file processing
# Default original: processor_poll_interval = 1
processor_poll_interval = 10
# after how much time (seconds) a new DAGs should be picked up from the filesystem
# Default original: min_file_process_interval = 0
min_file_process_interval = 10
# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300
# How often should stats be printed to the logs
print_stats_interval = 30
# If the last scheduler heartbeat happened more than scheduler_health_check_threshold ago (in seconds),# scheduler is considered unhealthy.
# This is used by the health check in the "/health" endpoint
scheduler_health_check_threshold = 30
child_process_log_directory = /home/airflow/logs/scheduler
...
# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work,but the scheduler
# will not do scheduler catchup if this is False,# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True
# This changes the batch size of queries in the scheduling main loop.
# If this is too high,SQL query performance may be impacted by one
# or more of the following:
# - reversion to full table scan
# - complexity of query predicate
# - excessive locking
#
# Additionally,you may hit the maximum allowable query length for your db.
#
# Set this to 0 for no limit (not advised)
max_tis_per_query = 512
# Statsd (https://github.com/etsy/statsd) integration settings
...
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
max_threads = 2
authenticate = False
# Turn off scheduler use of cron intervals by setting this to False.
# DAGs submitted manually in the web UI or with trigger_dag will still run.
use_job_schedule = True
[ldap]
...
[kerberos]
ccache = /tmp/airflow_krb5_ccache
# gets augmented with fqdn
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
...
[elasticsearch]
...
[kubernetes]
...
附录: 如果我忘记了任何内容,请发表评论,我会尽快添加。
解决方法
几个小时后,重新配置了所涉及的链接目录,我们找到了解决方案,并怀疑潜在的问题是什么。
在这种情况下,问题很可能是由于通过autofs而不是mount链接目录。 潜在问题很可能是:
- autofs与mount链接到同一用户,因此手动检查证明成功
- 链接的DAG文件夹与Airflow Scheduler服务器在不同域中的其他城镇的服务器上
- 但是,以用户身份运行的Airflow Scheduler无法通过autofs在其他域中找到目录(通过mount起作用)
用于使其正常运行的解决方案:
- 将DAG文件夹移动到位于Airflow Scheduler服务器域中的链接目录上的域中
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。