最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些关键的知识点/api。以窗口是flink一个重要的概念,flink提供了很多种窗口的使用方式,以下为窗口相关文档的第一部分,包含目录窗口功能中窗口折叠函数后的内容。
目录
-
窗口分配器
-
窗口功能
-
触发器
-
允许数据延迟
-
使用窗口结果
-
窗口处理函数
ProcessWindowFunction获取包含窗口所有元素的Iterable,以及可访问时间和状态信息的Context对象,这使其能够提供比其他窗口函数更多的灵活性。这是以性能和资源消耗为代价的,因为元素不能以递增方式聚合,而是需要在内部进行缓冲,直到认为窗口已准备好进行处理。
ProcessWindowFunction
外观的签名如下: -
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** * The context holding window metadata. */ public abstract class Context implements java.io.Serializable { /** * Returns the window that is being evaluated. */ public abstract W window(); /** Returns the current processing time. */ public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */ public abstract long currentWatermark(); /** * State accessor for per-key and per-window state. * * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up * by implementing {@link ProcessWindowFunction#clear(Context)}. */ public abstract KeyedStateStore windowState(); /** * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); } }
注意该
key
参数是通过KeySelector
为keyBy()
调用指定的密钥提取的密钥。在元组索引键或字符串字段引用的情况下,此键类型始终是Tuple
,您必须手动将其转换为正确大小的元组以提取键字段。ProcessWindowFunction
可以像这样定义和使用A : -
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** * The context holding window metadata. */ public abstract class Context implements java.io.Serializable { /** * Returns the window that is being evaluated. */ public abstract W window(); /** Returns the current processing time. */ public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */ public abstract long currentWatermark(); /** * State accessor for per-key and per-window state. * * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up * by implementing {@link ProcessWindowFunction#clear(Context)}. */ public abstract KeyedStateStore windowState(); /** * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); } }
该示例显示了
ProcessWindowFunction
对窗口中的元素进行计数的情况。此外,窗口功能将有关窗口的信息添加到输出。注意注意,使用
ProcessWindowFunction
简单的聚合(例如count)效率非常低。下一节将介绍aReduceFunction
或如何AggregateFunction
与a结合使用ProcessWindowFunction
以获得增量聚合和a的附加信息ProcessWindowFunction
。具有增量聚合的窗口处理函数
当a 元素到达窗口时,A
ProcessWindowFunction
可以与aReduceFunction
,anAggregateFunction
或a组合FoldFunction
以递增地聚合元素。当窗口关闭时,ProcessWindowFunction
将提供聚合结果。这允许它在访问附加窗口元信息的同时递增地计算窗口ProcessWindowFunction
。注意您也可以使用旧版
WindowFunction
而不是ProcessWindowFunction
增量窗口聚合。使用还原函数进行增量窗口聚合
以下示例显示了如何将增量
ReduceFunction
与a组合ProcessWindowFunction
以返回窗口中的最小事件以及窗口的开始时间。 -
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** * The context holding window metadata. */ public abstract class Context implements java.io.Serializable { /** * Returns the window that is being evaluated. */ public abstract W window(); /** Returns the current processing time. */ public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */ public abstract long currentWatermark(); /** * State accessor for per-key and per-window state. * * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up * by implementing {@link ProcessWindowFunction#clear(Context)}. */ public abstract KeyedStateStore windowState(); /** * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); } }
使用聚合函数进行增量窗口聚合
以下示例显示如何将增量
AggregateFunction
与a组合ProcessWindowFunction
以计算平均值,并同时发出键和窗口以及平均值。 -
DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .timeWindow(<duration>) .aggregate(new AverageAggregate(), new MyProcessWindowFunction()); // Function definitions /** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2<Long, Long> accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } private static class MyProcessWindowFunction extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> { public void process(String key, Context context, Iterable<Double> averages, Collector<Tuple2<String, Double>> out) { Double average = averages.iterator().next(); out.collect(new Tuple2<>(key, average)); } }
使用折叠函数进行增量窗口聚合
以下示例显示了如何将增量
FoldFunction
与a组合ProcessWindowFunction
以提取窗口中的事件数,并返回窗口的键和结束时间。 -
DataStream<SensorReading> input = ...; input .keyBy(<key selector>) .timeWindow(<duration>) .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction()) // Function definitions private static class MyFoldFunction implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > { public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) { Integer cur = acc.getField(2); acc.setField(cur + 1, 2); return acc; } } private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> { public void process(String key, Context context, Iterable<Tuple3<String, Long, Integer>> counts, Collector<Tuple3<String, Long, Integer>> out) { Integer count = counts.iterator().next().getField(2); out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count)); } }
窗口处理函数状态
除了访问键控状态(如任何丰富的函数可以)之外,
ProcessWindowFunction
还可以使用键控状态,该键控状态的作用域是函数当前正在处理的窗口。在这种情况下,了解每个窗口状态所指的窗口是很重要的。涉及不同的“窗口”: -
指定窗口操作时定义的窗口:这可能是1小时的翻滚窗口或滑动1小时的2小时滑动窗口。
-
给定键的已定义窗口的实际实例:对于user-id xyz,这可能是从12:00到13:00的时间窗口。这基于窗口定义,并且将基于作业当前正在处理的键的数量以及基于事件落入的时隙而存在许多窗口。
-
每窗口状态与后两者相关联。这意味着如果我们处理1000个不同键的事件,并且所有这些事件的事件当前都落入[12:00,13:00]时间窗口,那么将有1000个窗口实例,每个窗口实例都有自己的键控每窗口状态。
调用接收的
Context
对象有两种方法process()
允许访问两种类型的状态: -
globalState()
,允许访问没有作用于窗口的键控状态 -
windowState()
,允许访问也限定在窗口范围内的键控状态 -
如果您预计同一窗口会发生多次触发,则此功能非常有用,如果您迟到的数据延迟触发或者您有自定义触发器进行投机性早期触发时可能会发生这种情况。在这种情况下,您将存储有关先前点火的信息或每个窗口状态的点火次数。
使用窗口状态时,清除窗口时清除该状态也很重要。这应该在
clear()
方法中发生。窗口函数(遗产)
在一些
ProcessWindowFunction
可以使用的地方你也可以使用WindowFunction
。这是较旧版本ProcessWindowFunction
,提供较少的上下文信息,并且没有一些高级功能,例如每窗口键控状态。此接口将在某个时候弃用。WindowFunction
外观的签名如下: -
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param window The window that is being evaluated. * @param input The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception; }
它可以像这样使用:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
触发器
A Trigger
确定何时窗口功能准备好处理窗口(由窗口分配器形成)。每个都有一个默认值。如果默认触发器不符合您的需要,您可以使用指定自定义触发器。WindowAssigner``Trigger``trigger(...)
触发器接口有五种方法可以Trigger
对不同的事件做出反应:
-
onElement()
为添加到窗口的每个元素调用该方法。 -
onEventTime()
在注册的事件时间计时器触发时调用该方法。 -
onProcessingTime()
在注册的处理时间计时器触发时调用该方法。 -
该
onMerge()
方法与状态触发器相关,并且当它们的相应窗口合并时合并两个触发器的状态,例如当使用会话窗口时。 -
最后,该
clear()
方法在移除相应窗口时执行所需的任何动作。
关于上述方法需要注意两点:
1)前三个决定如何通过返回a来对其调用事件进行操作TriggerResult
。该操作可以是以下之一:
-
CONTINUE
: 没做什么, -
FIRE
:触发计算, -
PURGE
:清除窗口中的元素,和 -
FIRE_AND_PURGE
:触发计算并清除窗口中的元素。
2)这些方法中的任何一种都可用于为将来的操作注册处理或事件时间计时器。
触发与清除
一旦触发器确定窗口已准备好进行处理,它就会触发,即它返回FIRE
或FIRE_AND_PURGE
。这是窗口操作员发出当前窗口结果的信号。给定一个窗口,将ProcessWindowFunction
所有元素传递给ProcessWindowFunction
(可能在将它们传递给逐出器后)。窗户ReduceFunction
,AggregateFunction
或FoldFunction
简单地发出他们急切地汇总结果。
当触发器触发时,它可以FIRE
或者FIRE_AND_PURGE
。虽然FIRE
保持了窗口的内容,FIRE_AND_PURGE
删除其内容。默认情况下,预先实现的触发器只是在FIRE
不清除窗口状态的情况下。
注意清除将简单地删除窗口的内容,并将保留有关窗口和任何触发状态的任何潜在元信息。
WindowAssigners的默认触发器
默认值Trigger
a WindowAssigner
适用于许多用例。例如,所有事件时间窗口分配器都具有EventTimeTrigger
默认触发器。一旦水印通过窗口的末端,该触发器就会触发。
注意默认触发器GlobalWindow
是NeverTrigger
从不触发的。因此,在使用时必须定义自定义触发器GlobalWindow
。
注意通过使用trigger()
您指定触发器会覆盖a的默认触发器WindowAssigner
。例如,如果指定a CountTrigger
,TumblingEventTimeWindows
则不再根据时间进度获取窗口激活,而是仅按计数。现在,如果你想根据时间和数量做出反应,你必须编写自己的自定义触发器。
内置和自定义触发器
Flink附带了一些内置触发器。
-
(已经提到的)
EventTimeTrigger
基于水印测量的事件时间的进展而发生火灾。 -
在
ProcessingTimeTrigger
基于处理时间的火灾。 -
CountTrigger
一旦窗口中的元素数量超过给定限制,就会触发。 -
将
PurgingTrigger
另一个触发器作为参数作为参数并将其转换为清除触发器。
如果需要实现自定义触发器,则应该检查抽象 Trigger类。请注意,API仍在不断发展,可能会在Flink的未来版本中发生变化。
逐出器
Flink的窗口模型允许指定Evictor
除了WindowAssigner
和之外的可选项Trigger
。这可以使用evictor(...)
方法完成(如本文档开头所示)。所述逐出器必须从一个窗口中删除元素的能力之后触发器触发和之前和/或之后被施加的窗口函数。为此,该Evictor
接口有两种方法:
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
在evictBefore()
包含窗口函数之前被施加驱逐逻辑,而evictAfter()
包含窗口函数之后要施加的一个。在应用窗口函数之前被逐出的元素将不会被它处理。
Flink附带三个预先实施的驱逐者。这些是:
-
CountEvictor
:从窗口保持用户指定数量的元素,并从窗口缓冲区的开头丢弃剩余的元素。 -
DeltaEvictor
:取aDeltaFunction
和athreshold
,计算窗口缓冲区中最后一个元素与其余每个元素之间的差值,并删除delta大于或等于阈值的值。 -
TimeEvictor
:以interval
毫秒为单位作为参数,对于给定窗口,它查找max_ts
其元素中的最大时间戳,并删除时间戳小于的所有元素max_ts - interval
。
默认默认情况下,所有预先实现的evictors在窗口函数之前应用它们的逻辑。
注意指定逐出器会阻止任何预聚合,因为在应用计算之前,必须将窗口的所有元素传递给逐出器。
注意 Flink不保证窗口内元素的顺序。这意味着尽管逐出器可以从窗口的开头移除元素,但这些元素不一定是首先到达或最后到达的元素。
允许数据延迟
当使用事件时间窗口时,可能会发生数据延迟的情况,即 Flink用于跟踪事件时间进度的水印已经超过元素所属的窗口的结束时间戳。查看活动时间,特别是后期元素,以便更全面地讨论Flink如何处理活动时间。
默认情况下,当水印超过窗口末尾时,会删除延迟元素。但是,Flink允许为窗口运算符指定最大允许延迟。允许延迟指定元素在被删除之前可以延迟多少时间,并且其默认值为0.在水印已经过了窗口结束但在它通过窗口结束加上允许的延迟之后到达的元素,仍然被添加到窗口中。根据所使用的触发器,延迟但未丢弃的元素可能会导致窗口再次触发。就是这种情况EventTimeTrigger
。
为了使这项工作,Flink保持窗口的状态,直到他们允许的延迟到期。一旦发生这种情况,Flink将删除窗口并删除其状态,如“ 窗口生命周期”部分中所述。
默认默认情况下,允许的延迟设置为 0
。也就是说,到达水印后面的元素将被丢弃。
您可以指定允许的延迟,如下所示:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
注意使用GlobalWindows
窗口分配器时,没有数据被认为是迟到的,因为全局窗口的结束时间戳是Long.MAX_VALUE
。
将废弃数据作为副输出
使用Flink的侧输出功能,您可以获得最近丢弃的数据流。
首先需要指定您希望sideOutputLateData(OutputTag)
在窗口流上使用延迟数据。然后,您可以在窗口操作的结果上获取侧输出流:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
延迟元素考虑因素
当指定允许的延迟大于0时,在水印通过窗口结束后保持窗口及其内容。在这些情况下,当一个迟到但未掉落的元素到达时,它可能触发另一个窗口的射击。这些射击被称为late firings
,因为它们是由晚期事件触发的,与之相反的main firing
是窗口的第一次射击。在会话窗口的情况下,后期点火可以进一步导致窗口的合并,因为它们可以“桥接”两个预先存在的未合并窗口之间的间隙。
注意您应该知道,后期触发发出的元素应被视为先前计算的更新结果,即,您的数据流将包含同一计算的多个结果。根据您的应用程序,您需要考虑这些重复的结果或对其进行重复数据删除。
使用窗口结果
窗口化操作的结果也是a DataStream
,没有关于窗口操作的信息保留在结果元素中,因此如果要保留关于窗口的元信息,则必须在您的结果元素中手动编码该信息 ProcessWindowFunction
。在结果元素上设置的唯一相关信息是元素时间戳。这被设置为已处理窗口的最大允许时间戳,即结束时间戳-1,因为窗口结束时间戳是独占的。请注意,事件时间窗口和处理时间窗口都是如此。即,在窗口化操作元素之后始终具有时间戳,但这可以是事件时间时间戳或处理时间时间戳。对于处理时间窗口,这没有特别的含义,但对于事件时间窗口,这与水印与窗口交互的方式一起使得能够以相同的窗口大小进行 连续的窗口操作。在看了水印如何与窗口交互后,我们将介绍这一点。
水印和窗户的互动
在继续本节之前,您可能需要查看有关 事件时间和水印的部分。
当水印到达窗口操作符时,会触发两件事:
-
水印触发计算所有窗口,其中最大时间戳(即 结束时间戳-1)小于新水印
-
水印被转发(按原样)到下游操作
直观地,水印“冲出”任何窗口,一旦接收到该水印,将在下游操作中被认为是迟到的。
连续窗口操作
如前所述,计算窗口结果的时间戳的方式以及水印与窗口交互的方式允许将连续的窗口操作串联在一起。当您想要执行两个连续的窗口操作时,这可能很有用,您希望使用不同的键,但仍希望来自同一上游窗口的元素最终位于同一下游窗口中。考虑这个例子:
DataStream<Integer> input = ...;
DataStream<Integer> resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer());
DataStream<Integer> globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction());
在该示例中,[0, 5)
来自第一操作的时间窗口的结果也将[0, 5)
在随后的窗口化操作中的时间窗口中结束。这允许计算每个键的和,然后在第二个操作中计算同一窗口内的前k个元素。
关于窗口使用大小的考虑因素
Windows可以在很长一段时间内(例如几天,几周或几个月)定义,因此可以累积非常大的状态。在估算窗口计算的存储要求时,需要记住几条规则:
-
Flink为每个窗口创建一个每个元素的副本。鉴于此,翻滚窗口保留每个元素的一个副本(一个元素恰好属于一个窗口,除非它被延迟)。相反,滑动窗口会创建每个元素的几个,如“ 窗口分配器”部分中所述。因此,尺寸为1天且滑动1秒的滑动窗口可能不是一个好主意。
-
ReduceFunction
,AggregateFunction
并且FoldFunction
可以显着降低存储要求,因为它们急切地聚合元素并且每个窗口只存储一个值。相反,仅使用aProcessWindowFunction
需要累积所有元素。 -
使用an
Evictor
可以防止任何预聚合,因为在应用计算之前,窗口的所有元素都必须通过逐出器传递(参见Evictors)
原文连接https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html
原文地址:https://blog.csdn.net/jyj1100/article/details/88143228
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。