Flink监控:Monitoring Apache Flink Applications

This post originally appeared on the Apache Flink blog. It was reproduced here under the Apache License, Version 2.0.


This blog post provides an introduction to Apache Flink’s built-in monitoring and metrics system, that allows developers to effectively monitor their Flink jobs. Oftentimes, the task of picking the relevant metrics to monitor a Flink application can be overwhelming for a DevOps team that is just starting with stream processing and Apache Flink. Having worked with many organizations that deploy Flink at scale, I would like to share my experience and some best practice with the community.

With business-critical applications running on Apache Flink, performance monitoring becomes an increasingly important part of a successful production deployment. It ensures that any degradation or downtime is immediately identified and resolved as quickly as possible.

Monitoring goes hand-in-hand with observability, which is a prerequisite for troubleshooting and performance tuning. Nowadays, with the complexity of modern enterprise applications and the speed of delivery increasing, an engineering team must understand and have a complete overview of its applications’ status at any given point in time.

 

The foundation for monitoring Flink jobs is its metrics system which consists of two components; Metrics and MetricsReporters.

 

Metrics

Flink comes with a comprehensive set of built-in metrics such as:

  • Used JVM Heap / NonHeap / Direct Memory (per Task-/JobManager)

  • Number of Job Restarts (per Job)

  • Number of Records Per Second (per Operator)

These metrics have different scopes and measure more general (e.g. JVM or operating system) as well as Flink-specific aspects.

As a user, you can and should add application-specific metrics to your functions. Typically these include counters for the number of invalid records or the number of records temporarily buffered in managed state. Besides counters, Flink offers additional metrics types like gauges and histograms. For instructions on how to register your own metrics with Flink’s metrics system please check out Flink’s documentation. In this blog post, we will focus on how to get the most out of Flink’s built-in metrics.

 

MetricsReporters

All metrics can be queried via Flink’s REST API. However, users can configure MetricsReporters to send the metrics to external systems. Apache Flink provides reporters to the most common monitoring tools out-of-the-box including JMX, Prometheus, Datadog, Graphite and InfluxDB. For information about how to configure a reporter check out Flink’s MetricsReporter documentation.

In the remaining part of this blog post, we will go over some of the most important metrics to monitor your Apache Flink application.

 

Monitoring General Health

The first thing you want to monitor is whether your job is actually in a RUNNING state. In addition, it pays off to monitor the number of restarts and the time since the last restart.

Generally speaking, successful checkpointing is a strong indicator of the general health of your application. For each checkpoint, checkpoint barriers need to flow through the whole topology of your Flink job and events and barriers cannot overtake each other. Therefore, a successful checkpoint shows that no channel is fully congested.

 

Key Metrics

  Metric

 Scope

   Description

  uptime

 job

   The time that the job has been running without     interruption.

  fullRestarts

 job

   The total number of full restarts since this job was   submitted.

  numberOfCompletedCheckpoints

 job

   The number of successfully completed checkpoints.

  numberOfFailedCheckpoints

 job

   The number of failed checkpoints.

 

 

Example Dashboard Panels

image8

Figure 1: Uptime (35 minutes), Restarting Time (3 milliseconds) and Number of Full Restarts (7)

image4-1

Figure 2: Completed Checkpoints (18336), Failed (14)

 

Possible Alerts

  • ΔfullRestarts > threshold
  • ΔnumberOfFailedCheckpoints > threshold

 

Monitoring Progress & Throughput

Knowing that your application is RUNNING and checkpointing is working fine is good, but it does not tell you whether the application is actually making progress and keeping up with the upstream systems.

 

Throughput

Flink provides multiple metrics to measure the throughput of your application. For each operator or task (remember: a task can contain multiple chained tasks Flink counts the number of records and bytes going in and out. Out of those metrics, the rate of outgoing records per operator is often the most intuitive and easiest to reason about.

 

Key Metrics

  Metric

  Scope

   Description

  numRecordsOutPerSecond

  task

   The number of records this operator/task sends per second.

  numRecordsOutPerSecond

 operator

   The number of records this operator sends per second.

 

Example Dashboard Panels

image2-3

Figure 3: Mean Records Out per Second per Operator

 

Possible Alerts

  • recordsOutPerSecond = 0 (for a non-Sink operator)

Note: Source operators always have zero incoming records and sink operators always have zero outgoing records because the metrics only count Flink-internal communication. There is Jira ticketto change this behavior.

 

Progress

For applications, that use event time semantics, it is important that watermarks progress over time. A watermark of time t tells the framework, that it should not anymore expect to receive  events with a timestamp earlier than t, and in turn, to trigger all operations that were scheduled for a timestamp < t. For example, an event time window that ends at t = 30 will be closed and evaluated once the watermark passes 30.

As a consequence, you should monitor the watermark at event time-sensitive operators in your application, such as process functions and windows. If the difference between the current processing time and the watermark, known as even-time skew, is unusually high, then it typically implies one of two issues. First, it could mean that your are simply processing old events, for example during catch-up after a downtime or when your job is simply not able to keep up and events are queuing up. Second, it could mean a single upstream sub-task has not sent a watermark for a long time (for example because it did not receive any events to base the watermark on), which also prevents the watermark in downstream operators to progress. This JIRA ticket provides further information and a work around for the latter.

 

 

Key Metrics

   Metric

  Scope

   Description

   currentOutputWatermark

  operator

   The last watermark this operator has emitted

 

Example Dashboard Panel

image1-2

Figure 4: Event Time Lag per Subtask of a single operator in the topology. In this case, the watermark is lagging a few seconds behind for each subtask.

 

Possible Alerts

  • currentProcessingTime - currentOutputWatermark > threshold

 

“Keeping Up”

When consuming from a message queue, there is often a direct way to monitor if your application is keeping up. By using connector-specific metrics you can monitor how far behind the head of the message queue your current consumer group is. Flink forwards the underlying metrics from most sources.

 

Key Metrics

  Metric

 Scope  

  Description

  records-lag-max

  user

  applies to FlinkKafkaConsumer


 The maximum lag in terms of the number of records for any partition in  this window. An increasing value over time is your best indication that the consumer group is not keeping up with the producers.

 millisBehindLatest

  user

  applies to FlinkKinesisConsumer


  The number of milliseconds a consumer is behind the head of the stream. For any consumer and Kinesis shard, this indicates how far it is behind the current time.


 

Possible Alerts

  • records-lag-max > threshold

  • millisBehindLatest > threshold

 

Monitoring Latency

Generally speaking, latency is the delay between the creation of an event and the time at which results based on this event become visible. Once the event is created it is usually stored in a persistent message queue, before it is processed by Apache Flink, which then writes the results to a database or calls a downstream system. In such a pipeline, latency can be introduced at each stage and for various reasons including the following:

  1. It might take a varying amount of time until events are persisted in the message queue.

  2. During periods of high load or during recovery, events might spend some time in the message queue until they are processed by Flink (see previous section).

  3. Some operators in a streaming topology need to buffer events for some time (e.g. in a time window) for functional reasons.

  4. Each computation in your Flink topology (framework or user code), as well as each network shuffle, takes time and adds to latency.

  5. If the application emits through a transactional sink, the sink will only commit and publish transactions upon successful checkpoints of Flink, adding latency usually up to the checkpointing interval for each record.

In practice, it has proven invaluable to add timestamps to your events at multiple stages (at least at creation, persistence, ingestion by Flink, publication by Flink; possibly sampling those to save bandwidth). The differences between these timestamps can be exposed as a user-defined metric in your Flink topology to derive the latency distribution of each stage.

In the rest of this section, we will only consider latency, which is introduced inside the Flink topology and cannot be attributed to transactional sinks or events being buffered for functional reasons (4.).

To this end, Flink comes with a feature called Latency Tracking. When enabled, Flink will insert so-called latency markers periodically at all sources. For each sub-task, a latency distribution from each source to this operator will be reported. The granularity of these histograms can be further controlled by setting metrics.latency.granularity as desired.

Due to the potentially high number of histograms (in particular for metrics.latency.granularity: subtask), enabling latency tracking can significantly impact the performance of the cluster. It is recommended to only enable it to locate sources of latency during debugging.

 

Key Metrics

  Metric

  Scope    

   Description

  latency

 operator  

   The latency from the source operator to this operator.

 restartingTime

  job

   The time it took to restart the job, or how long the current restart has   been in progress.

 

Example Dashboard Panel

image7

Figure 5: Latency distribution between a source and a single sink subtask.

 

JVM Metrics

So far we have only looked at Flink-specific metrics. As long as latency & throughput of your application are in line with your expectations and it is checkpointing consistently, this is probably everything you need. On the other hand, if you job’s performance is starting to degrade among the first metrics you want to look at are memory consumption and CPU load of your Task- & JobManager JVMs.

 

Memory

Flink reports the usage of Heap, NonHeap, Direct & Mapped memory for JobManagers and TaskManagers.

  • Heap memory - as with most JVM applications - is the most volatile and important metric to watch. This is especially true when using Flink’s filesystem statebackend as it keeps all state objects on the JVM Heap. If the size of long-living objects on the Heap increases significantly, this can usually be attributed to the size of your application state (check the checkpointing metrics for an estimated size of the on-heap state). The possible reasons for growing state are very application-specific. Typically, an increasing number of keys, a large event-time skew between different input streams or simply missing state cleanup may cause growing state.

  • NonHeap memory is dominated by the metaspace, the size of which is unlimited by default and holds class metadata as well as static content. There is a JIRA Ticket to limit the size to 250 megabyte by default

  • The biggest driver of Direct memory is by far the number of Flink’s network buffers, which can be configured.

  • Mapped memory is usually close to zero as Flink does not use memory-mapped files.

In a containerized environment you should additionally monitor the overall memory consumption of the Job- and TaskManager containers to ensure they don’t exceed their resource limits. This is particularly important, when using the RocksDB statebackend, since RocksDB allocates a considerable amount of memory off heap. To understand how much memory RocksDB might use, you can checkout this blog post by Stefan Richter.

 

Key Metrics

  Metric

  Scope

   Description

 Status.JVM.Memory.NonHeap.Committed

 job-/

 taskmanager  

   The amount of non-heap memory   guaranteed to be available to the JVM   (in bytes).

 Status.JVM.Memory.Heap.Used

 job-/

 taskmanager

   The amount of heap memory currently   used (in bytes).

 Status.JVM.Memory.Heap.Committed

 job-/

 taskmanager

   The amount of heap memory     guaranteed to be available to the JVM   (in bytes).

 Status.JVM.Memory.Direct.MemoryUsed

 job-/

 taskmanager

   The amount of memory used by the   JVM for the direct buffer pool (in bytes).

 Status.JVM.Memory.Mapped.MemoryUsed

 job-/

 taskmanager

   The amount of memory used by the   JVM for the mapped buffer pool (in   bytes).

 Status.JVM.GarbageCollector.G1 Young   Generation.Time

 job-/

 taskmanager

   The total time spent performing G1       Young Generation garbage collection.

 Status.JVM.GarbageCollector.G1 Old   Generation.Time

 job-/

 taskmanager

   The total time spent performing G1 Old   Generation garbage collection.

 

Example Dashboard Panel

image3-1

Figure 6: TaskManager memory consumption and garbage collection times.

image6

Figure 7: JobManager memory consumption and garbage collection times.

 

Possible Alerts

  • container memory limit < container memory + safety margin

 

CPU

Besides memory, you should also monitor the CPU load of the TaskManagers. If your TaskManagers are constantly under very high load, you might be able to improve the overall performance by decreasing the number of task slots per TaskManager (in case of a Standalone setup), by providing more resources to the TaskManager (in case of a containerized setup), or by providing more TaskManagers. In general, a system already running under very high load during normal operations, will need much more time to catch-up after recovering from a downtime. During this time you will see a much higher latency (event-time skew) than usual.

A sudden increase in the CPU load might also be attributed to high garbage collection pressure, which should be visible in the JVM memory metrics as well.

If one or a few TaskManagers are constantly under very high load, this can slow down the whole topology due to long checkpoint alignment times and increasing event-time skew. A common reason is skew in the partition key of the data, which can be mitigated by pre-aggregating before the shuffle or keying on a more evenly distributed key.

 

Key Metrics

   Metrics

   Scope

   Description

   Status.JVM.CPU.Load

   job-/

   taskmanager

   The recent CPU usage of the JVM.

 

 
Example Dashboard Panel

image5-1

Figure 8: TaskManager & JobManager CPU load

 

System Resources

In addition to the JVM metrics above, it is also possible to use Flink’s metrics system to gather insights about system resources, i.e. memory, CPU & network-related metrics for the whole machine as opposed to the Flink processes alone. System resource monitoring is disabled by default and requires additional dependencies on the classpath. Please check out the Flink system resource metrics documentation for additional guidance and details. System resource monitoring in Flink can be very helpful in setups without existing host monitoring capabilities.

 

Conclusion

This post tries to shed some light on Flink’s metrics and monitoring system. You can utilise it as a starting point when you first think about how to successfully monitor your Flink application. I highly recommend to start monitoring your Flink application early on in the development phase. This way you will be able to improve your dashboards and alerts over time and, more importantly, observe the performance impact of the changes to your application throughout the development phase. By doing so, you can ask the right questions about the runtime behaviour of your application, and learn much more about Flink’s internals early on.

原文地址:https://www.cnblogs.com/felixzh/p/10502168.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的处理数据是固定的离线计算是有延时的,T+1离线计算是数据处理完输出结果,只是输出最终结果离线计算相对可以处理复杂的计算1.2实时计算实时计算是实时的处理数据,数据从流入到计算出结果延迟低实时计算是输
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能:新增工作流新增OceanBaseSQL新增Flinkjar任务数据同步、实时采集支持脏数据管理HiveUDF控制台UI升级租户绑定简化新版本的使用文档已在社区中推送,大家可以随时下载查阅,欢迎大家体验新版本功能
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽。本文主要记录一下Java使用Flink的简单例子。首先,去官网下载Flink的zip包(链接就不提供了,你已经是个成熟的程序员了,该有一定的搜索能力了),解压后放到你想放的地方。进入主目录后,是这样子的 image.png你可以简
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasreproducedhereunderthe ApacheLicense,Version2.0.ThisblogpostprovidesanintroductiontoApacheFlink’sbuilt-inmonitoringandmetricssystem,thatallowsdeveloperstoeffectively
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下的flink-conf.yaml:经过初步的调整,大约有以下模块的参数(未优化)LicensedtotheApacheSoftwareFoundation(ASF)underoneormorecontributorlicenseagreements.SeetheNOTICEfiledistributedwiththis
1.mac平台安装flink(默认最新版)brewinstallapache-flink安装结果:Version1.7.1,commitID:89eafb42.jdk版本,我尝试使用了Java8和Java11,都能兼容3.在flink的安装目录下,启动flink目录一般默认在/usr/local/Cellar/apache-flink/1.7.1/(查找flink安装目录:find/-name
课程目标:学完该课程大家会对Flink有非常深入的了解,同时可以体会到Flink的强大之处,以及可以结合自己公司的业务进行使用,减少自己研究和学习Flink的时间。适合人群:适合有大数据开发基础和flink基础的同学。在开始学习前给大家说下什么是Flink? 1.Flink是一个针对流数据和批数据的
本文主要研究一下flink的NetworkEnvironmentConfigurationNetworkEnvironmentConfigurationflink-1.7.2/flink-runtime/src/main/java/org/apache/flinkuntimeaskmanager/NetworkEnvironmentConfiguration.javapublicclassNetworkEnvironmentCon
January22,2019 UseCases, ApacheFlinkLasseNedergaard   Recentlytherehasbeensignificantdiscussionaboutedgecomputingasamajortechnologytrendin2019.Edgecomputingbrings computingcapabilitiesawayfromthecloud,andrathercloset
1DataStreamAPI1.1DataStreamDataSources   source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。   flink提供了大量的已经实现好的source方法,可以自定义source   通过实现sourceFunction接口来
基于Flink流处理的动态实时亿级全端用户画像系统课程下载:https://pan.baidu.com/s/1YtMs-XG5-PsTFV9_7-AlfA提取码:639m项目中采用到的算法包含LogisticRegression、Kmeans、TF-IDF等,Flink暂时支持的算法比较少,对于以上算法,本课程将手把手带大家用Flink实现,并且结合真实场景,
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些
 flink集群安装部署 standalone集群模式 必须依赖必须的软件JAVA_HOME配置flink安装配置flink启动flink添加Jobmanageraskmanager实例到集群个人真实环境实践安装步骤 必须依赖必须的软件flink运行在所有类unix环境中,例如:linux、mac、或
1Flink的前世今生(生态很重要)很多人可能都是在2015年才听到Flink这个词,其实早在2008年,Flink的前身已经是柏林理工大学一个研究性项目,在2014被Apache孵化器所接受,然后迅速地成为了ASF(ApacheSoftwareFoundation)的顶级项目之一。   ApacheFlinkisanopensource
序本文主要研究一下flink的CsvTableSourceTableSourceflink-table_2.11-1.7.1-sources.jar!/org/apache/flinkable/sources/TableSource.scalatraitTableSource[T]{/**Returnsthe[[TypeInformation]]forthereturntypeoft
原文链接JobManager高可用性(HA)  作业管理器JobManager协调每个Flink部署组件,它负责调度以及资源管理。  默认情况下,每个Flink集群只有一个独立的JobManager实例,因此可能会产生单点故障(SPOF)。  使用JobManagerHighAvailability,可以从JobManager的故障中恢复,从而消除SPOF。
一、背景在flink本地环境安装完成之后,就想着怎么能调试和运行一个flink示例程序,本文记录下过程。二、获取flink源码通过如下命令,获取flink源码,在源码中有flink-examples模块,该模块中包含简单的SocketWindowWordCount.java示例程序。gitclonehttps://github.com/apache/
作为一家创新驱动的科技公司,袋鼠云每年研发投入达数千万,公司80%员工都是技术人员,袋鼠云产品家族包括企业级一站式数据中台PaaS数栈、交互式数据可视化大屏开发平台Easy[V]等产品也在迅速迭代。在进行产品研发的过程中,技术小哥哥们能文能武,不断提升产品性能和体验的同时,也把这些提
在阅读本文之前,请先阅读Flink原理与实现:Window机制,这篇文章从用户的角度,对Window做了比较详细的分析,而本文主要是从Flink框架的实现层面,对Window做另一个角度的分析。首先看一个比较简单的情况,假设我们在一个KeyedStream上做了一个10秒钟的tumblingprocessingtimewindow