Airflow Scheduler未执行计划的作业,并且未生成日志

如何解决Airflow Scheduler未执行计划的作业,并且未生成日志

环境:

  • RHEL 8.2 x86_64
  • 气流1.10.10
  • PostgreSQL 12.3
  • Python 3.6 设置:
  • Airflow以用户“ svc_etl”的身份运行,该用户具有通过组和用户访问Airflow主文件夹,DAG和日志文件夹的权限
  • Windows Samba共享上的DAG文件夹位置(链接的文件夹)
  • Windows Samba共享上的任务日志文件夹位置
  • Postgres和Airflow作为服务(systemctl)在同一台服务器(VM)上运行
  • 服务服务器上的Airflow home文件夹 此设置是由于预算限制所致。

问题: DAG上的计划不会执行,并且不会以失败状态登录而结束。在某些情况下,在DAG的树视图中使用选项“清除”会成功执行第一个任务。对Clear的任何进一步使用都会导致执行后续任务。 这些任务可以是完整的模块,也可以是单独的功能。 Airflow Scheduler日志中没有错误,任务本身也没有日志。

采取的步骤:

  • 停止Airflow调度程序,从文件夹和数据库中删除所有DAG及其日志。重新启动Airflow Scheduler,将DAG复制回文件夹并等待。 DAG重新出现在GUI和数据库中。预定任务仍将在GUI和数据库中显示为失败。清除错误会导致第一个任务成功执行,然后进一步清除后,第一个任务和后续任务将成功执行(并写入任务日志)。
  • 手动执行可以导致(取决于DAG定义)相同的行为。
  • DAG在另一台仅顺序执行的Airflow测试服务器上成功运行。两台服务器的Log和DAG文件夹位于不同的位置,并且彼此分开。

问题:

  • 如何找出Airflow / Postgres的哪个组件会产生这种行为?

气流配置:

[core]
# The folder where your airflow pipelines live,most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /ABTEILUNG/DWH/airflow/dags

# The folder where airflow should store its log files
# This path must be absolute
# > writing logs to the GIT main folder is a very bad idea and needs to change soon!!!
base_log_folder = /ABTEILUNG/DWH/airflow/logs

# Airflow can store logs remotely in AWS S3,Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true,see UPDATING.md for additional
# configuration requirements.
remote_logging = False
remote_log_conn_id =
remote_base_log_folder =
encrypt_s3_logs = False

# Logging level
logging_level = INFO
fab_logging_level = WARN

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =

# Log format
# Colour the logs when the controlling terminal is a TTY.
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter

log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Log filename format
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /home/airflow/logs/dag_processor_manager/dag_processor_manager.log

# Hostname by providing a path to a callable,which will resolve the hostname
# The format is "package:function". For example,# default value "socket:getfqdn" means that result from getfqdn() of "socket" package will be used as hostname
# No argument should be required in the function specified.
# If using IP address as hostname is preferred,use value "airflow.utils.net:get_host_ip_address"
hostname_callable = socket:getfqdn

# Default timezone in case supplied date times are naive
# can be utc (default),system,or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = Europe/Berlin

# The executor class that airflow should use. Choices include
# SequentialExecutor,LocalExecutor,CeleryExecutor,DaskExecutor,KubernetesExecutor
#Default:
#executor = SequentialExecutor
executor = LocalExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine,more information
# their website
#sql_alchemy_conn = sqlite:////home/airflow/airflow.db
#! this needs to change to MySQL or Postgres database
sql_alchemy_conn = postgresql+psycopg2://localhost:5436/airflow_db?user=svc_etl&password=<some-hard-password>

# The encoding for the databases
sql_engine_encoding = utf-8

# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 5

# The maximum overflow size of the pool.
# When the number of checked-out connections reaches the size set in pool_size,# additional connections will be returned up to this limit.
# When those additional connections are returned to the pool,they are disconnected and discarded.
# It follows then that the total number of simultaneous connections the pool will allow is pool_size + max_overflow,# and the total number of "sleeping" connections the pool will allow is pool_size.
# max_overflow can be set to -1 to indicate no overflow limit;
# no limit will be placed on the total number of concurrent connections. Defaults to 10.
sql_alchemy_max_overflow = 10

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite. If the number of DB connections is ever exceeded,# a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 1800

# Check connection at the start of each connection pool checkout.
# Typically,this is a simple statement like “SELECT 1”.
# More information here: https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
sql_alchemy_pool_pre_ping = True

# The schema to use for the metadata database
# SqlAlchemy supports databases with the concept of multiple schemas.
sql_alchemy_schema =

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
# > actually this parameter is more like max_active_tasks per Airflow state database across all processing servers
# > Default original: parallelism = 32
parallelism = 6

# The number of task instances allowed to run concurrently by the schedule
# > again more like max_active_tasks_for_worker (process)
# > Default original: dag_concurrency = 16
dag_concurrency = 4

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# The maximum number of active DAG runs per DAG
# > Default original: max_active_runs_per_dag = 16
max_active_runs_per_dag = 4

# Whether to load the examples that ship with Airflow. It's good to
# get started,but you probably want to set this to False in a production
# environment
load_examples = False

# Where your Airflow plugins are stored
plugins_folder = /home/airflow/plugins

# Secret key to save connection passwords in the db
fernet_key = 
#! this needs to be set to enable obfuscated password shown in GUI

# Whether to disable pickling dags
donot_pickle = False

# How long before timing out a python file import
dagbag_import_timeout = 30

# How long before timing out a DagFileProcessor,which processes a dag file
dag_file_processor_timeout = 120

# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner

# If set,tasks without a `run_as_user` argument will be run with this user
# Can be used to de-elevate a sudo user running Airflow when executing tasks
default_impersonation =

# What security module to use (for example kerberos):
security =

# If set to False enables some unsecure features like Charts and Ad Hoc Queries.
# In 2.0 will default to True.
secure_mode = False

# Turn unit test mode on (overwrites many configuration options with test
# values at runtime)
unit_test_mode = False

# Name of handler to read task instance logs.
# Default to use task handler.
task_log_reader = task

# Whether to enable pickling for xcom (note that this is insecure and allows for
# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False).
enable_xcom_pickling = True

# When a task is killed forcefully,this is the amount of time in seconds that
# it has to cleanup after it is sent a SIGTERM,before it is SIGKILLED
killed_task_cleanup_time = 60

# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`,the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params = False

# Worker initialisation check to validate Metadata Database connection
worker_precheck = False

# When discovering DAGs,ignore any files that don't contain the strings `DAG` and `airflow`.
dag_discovery_safe_mode = True

# The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0

[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly,while the json_client will use the api running on the
# webserver
api_client = airflow.api.client.local_client

# If you set web_server_url_prefix,do NOT forget to append it here,ex:
# endpoint_url = http://localhost:8080/myroot
# So api will look like: http://localhost:8080/myroot/api/experimental/...
endpoint_url = http://localhost:8080

[api]
# How to authenticate users of the API
auth_backend = airflow.api.auth.backend.default

[lineage]
# what lineage backend to use
backend =

[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =

[operators]
# The default owner assigned to each new operator,unless
# provided explicitly or passed via `default_args`
default_owner = airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0

[hive]
...
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080

# The ip specified when starting the web server
web_server_host = 0.0.0.0

# The port on which to run the web server
web_server_port = 8080
...
# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
#Default:web_server_master_timeout = 120
web_server_master_timeout = 300

# Number of seconds the gunicorn webserver waits before timing out on a worker
#Default:web_server_worker_timeout = 120
web_server_worker_timeout = 300

# Number of workers to refresh at a time. When set to 0,worker refresh is
# disabled. When nonzero,airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1

# Number of seconds to wait before refreshing a batch of workers.
#Default:worker_refresh_interval = 30
worker_refresh_interval = 60

# Secret key used to run your flask app
secret_key = temporary_key

# Number of workers to run the Gunicorn web server
#Default:workers = 4
workers = 2

# The worker class gunicorn should use. Choices include
# sync (default),eventlet,gevent
worker_class = sync

# Log files for the gunicorn webserver. '-' means log to stderr.
#Default:access_logfile = -
#Default:error_logfile = -
access_logile = -
error_logfile = /home/airflow/gunicorn.err
...
# Default DAG view.  Valid values are:
# tree,graph,duration,gantt,landing_times
dag_default_view = tree

# Default DAG orientation. Valid values are:
# LR (Left->Right),TB (Top->Bottom),RL (Right->Left),BT (Bottom->Top)
dag_orientation = LR

# Puts the webserver in demonstration mode; blurs the names of Operators for
# privacy.
demo_mode = False

# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
#Default:log_fetch_timeout_sec = 5
log_fetch_timeout_sec = 30

# By default,the webserver shows paused DAGs. Flip this to hide paused
# DAGs by default
hide_paused_dags_by_default = False

# Consistent page size across all listing views in the UI
page_size = 100
...
# Define the color of navigation bar
#Default:navbar_color = #007A87
navbar_color = #1C33C7

# Default dagrun to show in UI
default_dag_run_display_number = 25
...
# Default setting for wrap toggle on DAG code and TI log views.
default_wrap = False
...
[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]
# If you want airflow to send emails on retries,failure,and you want to use
# the airflow.utils.email.send_email_smtp function,you have to configure an
# smtp server here
smtp_host = <valid-SMTP-server-connection>
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = 
# smtp_password = 
smtp_port = 25
smtp_mail_from = <valid-domain-address@domain>

[sentry]
...
[celery]
...
[dask]
...
[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI),this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
# Default original: scheduler_heartbeat_sec = 5
scheduler_heartbeat_sec = 20

# after how much time should the scheduler terminate in seconds
# -1 indicates to run continuously (see also num_runs)
run_duration = -1

# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1

# The number of seconds to wait between consecutive DAG file processing
# Default original: processor_poll_interval = 1
processor_poll_interval = 10

# after how much time (seconds) a new DAGs should be picked up from the filesystem
# Default original: min_file_process_interval = 0
min_file_process_interval = 10

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300

# How often should stats be printed to the logs
print_stats_interval = 30

# If the last scheduler heartbeat happened more than scheduler_health_check_threshold ago (in seconds),# scheduler is considered unhealthy.
# This is used by the health check in the "/health" endpoint
scheduler_health_check_threshold = 30

child_process_log_directory = /home/airflow/logs/scheduler
...
# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work,but the scheduler
# will not do scheduler catchup if this is False,# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True

# This changes the batch size of queries in the scheduling main loop.
# If this is too high,SQL query performance may be impacted by one
# or more of the following:
#  - reversion to full table scan
#  - complexity of query predicate
#  - excessive locking
#
# Additionally,you may hit the maximum allowable query length for your db.
#
# Set this to 0 for no limit (not advised)
max_tis_per_query = 512

# Statsd (https://github.com/etsy/statsd) integration settings
...
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
max_threads = 2

authenticate = False

# Turn off scheduler use of cron intervals by setting this to False.
# DAGs submitted manually in the web UI or with trigger_dag will still run.
use_job_schedule = True

[ldap]
...
[kerberos]
ccache = /tmp/airflow_krb5_ccache
# gets augmented with fqdn
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
...
[elasticsearch]
...
[kubernetes]
...

附录: 如果我忘记了任何内容,请发表评论,我会尽快添加。

解决方法

几个小时后,重新配置了所涉及的链接目录,我们找到了解决方案,并怀疑潜在的问题是什么。

在这种情况下,问题很可能是由于通过autofs而不是mount链接目录。 潜在问题很可能是:

  • autofs与mount链接到同一用户,因此手动检查证明成功
  • 链接的DAG文件夹与Airflow Scheduler服务器在不同域中的其他城镇的服务器上
  • 但是,以用户身份运行的Airflow Scheduler无法通过autofs在其他域中找到目录(通过mount起作用)

用于使其正常运行的解决方案:

  • 将DAG文件夹移动到位于Airflow Scheduler服务器域中的链接目录上的域中

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-