Flink原理与实现:Window的实现原理

在这里插入图片描述

在阅读本文之前,请先阅读Flink 原理与实现:Window机制,这篇文章从用户的角度,对Window做了比较详细的分析,而本文主要是从Flink框架的实现层面,对Window做另一个角度的分析。

首先看一个比较简单的情况,假设我们在一个KeyedStream上做了一个10秒钟的tumbling processing time window,也就是说,每隔10秒钟窗口会触发一次,即:

  dataStream.keyBy(0).timeWindow(Time.seconds(10)).sum(1);

在研究源码之前,我们可以在脑子里大概想象一下它应该会怎么处理:

  1. 给定一条数据,会给这条数据assign windows,在这个例子中,因为是翻滚窗口,所以只会assign出一个窗口。
  2. assign了窗口之后,我们就知道这条消息对应的窗口起始时间,这里比较重要的是窗口的右边界,即窗口过期时间。
  3. 我们可以在窗口的过期时间之上,注册一个Scheduled Future,到时间之后让它自动回调窗口的聚合函数,即触发窗口内的数据计算。

上面的第3步,针对KeyedStream,需要再扩展一下,针对一条数据,我们应该注册一个基于Key + Window的Scheduled Future,到时间之后只触发对于这个key的数据计算。当然,这里我们不自觉地会想,当key的数目非常大的时候,可能会创建出大量的Future,这会是一个问题。

脑子中大致有上面的思路之后,我们来看一下Flink的实现。

首先,KeyedStream.timeWindow方法会生成一个WindowedStream,sum则是我们的aggregator,因此在WindowedStream中,实际调用了aggregate(new SumAggregator(...)),然后一层层调下来,目标就是生成一个WindowOperator。

在Flink中,根据是否配置了evictor,会生成两种不同的WindowOperator:

  • 有evictor:这种情况下意味着用户需要精确控制如何evict窗口的元素,因此所有的数据都会先缓存起来。此时会生成EvictingWindowOperator
  • 无evictor:这种情况下,通过指定一个reduce function,来一条数据就会进行reduce,当到达窗口边界之后,直接输出结果就可以了。此时会生成WindowOperator

不管哪一种operator,都需要指定两个function:

  • window function:控制如何处理窗口内的元素结果
  • reduce function:控制如何对窗口内元素做聚合

当一个窗口被fire的时候,就需要通过window function来控制如何处理窗口的结果了。比如PassThroughWindowFunction啥也不做,对每一条结果都直接调用out.collect发送到下游;而ReduceApplyWindowFunction则在这个时候针对窗口所有元素,调用reduce function进行聚合计算,再将计算的结果发射出去。

在上面的例子中,由于没有指定evictor,因此会生成WindowOperator,它的window function为InternalSingleValueWindowFunction,它提供了对PassThroughWindowFunction的代理。而reduce function则用于构造StateDescriptor:

            ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
                reduceFunction,
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

也就是说,对于sum这种情况,每来一条消息,都会调用reduce function,然后更新reducing state,最后窗口被触发的时候,直接通过PassThroughWindowFunction输出reducing state的结果。

接下来看一下WindowOperator.processElement方法:

     // 给元素分配窗口
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);
    <span class="hljs-keyword">if</span> (windowAssigner <span class="hljs-keyword">instanceof</span> MergingWindowAssigner) {
        <span class="hljs-comment">// session window的处理逻辑</span>
        <span class="hljs-comment">// ...</span>
  } <span class="hljs-keyword">else</span> {
    <span class="hljs-comment">// 遍历每一个窗口</span>
        <span class="hljs-keyword">for</span> (W window: elementWindows) {
            <span class="hljs-comment">// 如果窗口已经过期,直接忽略</span>
            <span class="hljs-keyword">if</span> (isWindowLate(window)) {
                <span class="hljs-keyword">continue</span>;
            }
            isSkippedElement = <span class="hljs-keyword">false</span>;

            windowState.setCurrentNamespace(window);
            windowState.add(element.getValue());

            triggerContext.key = key;
            triggerContext.window = window;

            TriggerResult triggerResult = triggerContext.onElement(element);

       <span class="hljs-comment">// 窗口被触发了</span>
            <span class="hljs-keyword">if</span> (triggerResult.isFire()) {
                ACC contents = windowState.get();
                <span class="hljs-keyword">if</span> (contents == <span class="hljs-keyword">null</span>) {
                    <span class="hljs-keyword">continue</span>;
                }
                <span class="hljs-comment">// 输出窗口结果</span>
                emitWindowContents(window, contents);
            }

       <span class="hljs-comment">// 如果窗口需要purge,则清理状态</span>
            <span class="hljs-keyword">if</span> (triggerResult.isPurge()) {
                windowState.clear();
            }
            registerCleanupTimer(window);
        }</code></pre>

可以看到,大致的逻辑还是非常简单明了的,关键在于这一行:

    TriggerResult triggerResult = triggerContext.onElement(element);

这里针对不同的window,会有不同的trigger。其中ProcessingTime的都是ProcessingTimeTrigger。看下它的onElement方法:

        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;

可以看到,onElement方法始终返回TriggerResult.CONTINUE这个结果,不会触发窗口的fire操作。那么重点就是第一行了,它实际调用了WindowOperator.registerProcessingTimeTimer方法:

    internalTimerService.registerProcessingTimeTimer(window, time);

这是一个InternalTimerService对象,在WindowOperator.open方法中被创建:

    internalTimerService =
                getInternalTimerService("window-timers", windowSerializer, this);

它通过InternalTimeServiceManager.getInternalTimeService获取:

        HeapInternalTimerService<K, N> timerService = timerServices.get(name);
        if (timerService == null) {
            timerService = new HeapInternalTimerService<>(totalKeyGroups,
                localKeyGroupRange, keyContext, processingTimeService);
            timerServices.put(name, timerService);
        }
        timerService.startTimerService(keySerializer, namespaceSerializer, triggerable);
        return timerService;

即创建了一个HeapInternalTimerService实例。

看一下这个类的成员,这里省掉了序列化和反序列化相关的成员变量:

  // 目前使用SystemProcessingTimeService,包含了窗口到期回调线程池
    private final ProcessingTimeService processingTimeService;
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> KeyContext keyContext;

<span class="hljs-comment">/**
 * Processing time相关的timer
 */</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> Set&lt;InternalTimer&lt;K, N&gt;&gt;[] processingTimeTimersByKeyGroup;
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> PriorityQueue&lt;InternalTimer&lt;K, N&gt;&gt; processingTimeTimersQueue;

<span class="hljs-comment">/**
 * Event time相关的timer
 */</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> Set&lt;InternalTimer&lt;K, N&gt;&gt;[] eventTimeTimersByKeyGroup;
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> PriorityQueue&lt;InternalTimer&lt;K, N&gt;&gt; eventTimeTimersQueue;

<span class="hljs-comment">/**
 * 当前task的key-group range信息
 */</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> KeyGroupsList localKeyGroupRange;
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> <span class="hljs-keyword">int</span> totalKeyGroups;
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> <span class="hljs-keyword">int</span> localKeyGroupRangeStartIdx;

<span class="hljs-comment">/**
 * 当前task的watermark,针对event time
 */</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">long</span> currentWatermark = Long.MIN_VALUE;

<span class="hljs-comment">/**
 * 最接近的未被触发的窗口的Scheduled Future
 * */</span>
<span class="hljs-keyword">private</span> ScheduledFuture&lt;?&gt; nextTimer;

// 窗口的回调函数,如果是WindowOperator,则会根据时间类型,回调WindowOperator.onEventTime或onProcessingTime方法
private Triggerable<K, N> triggerTarget;

可以看到,存储窗口timer的数据结构processingTimeTimersByKeyGroup或者eventTimeTimersByKeyGroup,跟存储state的数据结构很像,也是根据当前task的key group range,创建一个数组。每一个key都会落到数组的一个下标,这个数组元素值是一个Set<InternalTimer<K,N>>,即Key + Window作为这个集合的key。

此外,还有一个processingTimeTimersQueue或者eventTimeTimersQueue,这是一个优先队列,会存储所有的 Key + Window的timer,主要作用就是用于快速取出最接近的未被触发的窗口。

接下来看下这个类的registerProcessingTimeTimer方法:

    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    <span class="hljs-comment">// 获取数组下标下的Timer Set</span>
    Set&lt;InternalTimer&lt;K, N&gt;&gt; timerSet = getProcessingTimeTimerSetForTimer(timer);
    <span class="hljs-comment">// 判断是否已经添加过这个timer</span>
    <span class="hljs-keyword">if</span> (timerSet.add(timer)) {
        InternalTimer&lt;K, N&gt; oldHead = processingTimeTimersQueue.peek();
        <span class="hljs-keyword">long</span> nextTriggerTime = oldHead != <span class="hljs-keyword">null</span> ? oldHead.getTimestamp() : Long.MAX_VALUE;

        processingTimeTimersQueue.add(timer);

        <span class="hljs-comment">// 如果新添加的timer的窗口触发时间早于nextTimer,则取消nextTimer的触发,</span>
        <span class="hljs-keyword">if</span> (time &lt; nextTriggerTime) {
            <span class="hljs-keyword">if</span> (nextTimer != <span class="hljs-keyword">null</span>) {
                nextTimer.cancel(<span class="hljs-keyword">false</span>);
            }
            <span class="hljs-comment">// 注册新添加的timer回调</span>
            nextTimer = processingTimeService.registerTimer(time, <span class="hljs-keyword">this</span>);
        }
    }
}</code></pre>

processingTimeService.registerTimer(time, this)注册回调,会调用SystemProcessingTimeService.registerTimer,这个方法很简单,会计算出当前时间跟窗口边界的delay,然后通过ScheduledExecutorService注册一个定时的回调,其中target为HeapInternalTimerService本身。

举例来说,当前时间为 2017-06-15 19:00:01,来了一条消息,那么它被assign的窗口为[2017-06-15 19:00:00, 2017-06-15 19:00:10),计算出来的delay为9秒,因此在9秒之后,会触发HeapInternalTimerService.onProcessingTime方法。

看下这个方法的代码:

    public void onProcessingTime(long time) throws Exception {
        nextTimer = null;
        InternalTimer<K, N> timer;
    <span class="hljs-keyword">while</span> ((timer = processingTimeTimersQueue.peek()) != <span class="hljs-keyword">null</span> &amp;&amp; timer.getTimestamp() &lt;= time) {

        Set&lt;InternalTimer&lt;K, N&gt;&gt; timerSet = getProcessingTimeTimerSetForTimer(timer);

        timerSet.remove(timer);
        processingTimeTimersQueue.remove();

        keyContext.setCurrentKey(timer.getKey());
        triggerTarget.onProcessingTime(timer);
    }

    <span class="hljs-keyword">if</span> (timer != <span class="hljs-keyword">null</span>) {
        <span class="hljs-keyword">if</span> (nextTimer == <span class="hljs-keyword">null</span>) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), <span class="hljs-keyword">this</span>);
        }
    }
}</code></pre>

这个方法也比较简单,根据当前的窗口边界,从processingTimeTimersQueue这个队列中一个个取timer,只要timer所对应的窗口边界<=当前窗口边界时间,就将timer从timer set中删除,并调用triggerTarget.onProcessingTime(timer)触发该窗口。最后设置并注册nextTimer,即下一个最接近的窗口的回调。

对于KeyedStream下的窗口,实际上的情况是,在同一个窗口到达的多个不同的key,实际上窗口的边界都是相同的,所以当一个key的窗口被触发,同时也会触发该Task上所有key group range的窗口。


看了对Processing Time的处理,接下来看看Event time的情况。

event time跟processing time稍有不同,因为它的窗口触发时间,会依赖于watermark,并不是确定的(会有一个最迟的触发时间)。只要当watermark越过当前窗口的边界,这个窗口就可以被触发。因此它并没有使用nextTimer这个变量来注册和标识一个最接近的未被触发的窗口。

注册event time的timer时也比较简单,只是同时往timerSet和eventTimeTimersQueue中添加timer。

另外,Event Time的trigger使用的是EventTimeTrigger,它的onElement方法如下:

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

可以看到,每处理一个元素,都会拿当前元素所属窗口的max timestamp去跟当前的watermark对比,如果小于watermark,说明已经越过窗口的边界,则fire该窗口。

ctx.getCurrentWatermark()方法实际调用的是WindowOperator.WindowContext.getCurrentWatermark方法,返回的是HeapInternalTimerService的currentWatermark。

那么,watermark是在哪里被更新的呢?在HeapInternalTimerService.advanceWatermark方法中。代码如下:

        currentWatermark = time;
    InternalTimer&lt;K, N&gt; timer;

    <span class="hljs-keyword">while</span> ((timer = eventTimeTimersQueue.peek()) != <span class="hljs-keyword">null</span> &amp;&amp; timer.getTimestamp() &lt;= time) {

        Set&lt;InternalTimer&lt;K, N&gt;&gt; timerSet = getEventTimeTimerSetForTimer(timer);
        timerSet.remove(timer);
        eventTimeTimersQueue.remove();

        keyContext.setCurrentKey(timer.getKey());
        triggerTarget.onEventTime(timer);
    }</code></pre>

可以看到,这个方法不仅更新了当前watermark,而且会用触发processing time window很接近的逻辑,来触发event time window。

这个方法在AbstractStreamOperator.processWatermark方法中被调用。processWatermark则在StreamInputProcessor中的ForwardingValveOutputHandler.handleWatermark方法中被调用。一个自下往上的反向调用链如下:

HeapInternalTimerService.advanceWatermark
  <-- AbstractStreamOperator.processWatermark
  <-- StreamInputProcessor.StatusWatermarkValve.handleWatermark
  <-- StatusWatermarkValve.inputWatermark
  <-- StreamInputProcessor.processInput

这样,当一个processor收到的消息是一个watermark的时候,就会更新time service中的watermark。这里也可以看到,对于普通的用户消息,是不会主动更新watermark的。因此在Flink中,如果要使用Event Time,就必须实现一个发射watermark的策略:

  • 要么数据源自己会发送watermark(实际情况中不大可能,除非用户基于特定数据源自行封装)
  • 要么实现TimestampAssigner接口,定时往下游发送watermark。这还是有一定的限制的。

还有一个需要考虑的问题是,一个Task可能接受到上游多个channel的输入,每个channel都会有watermark,但是每个channel的进度是不一样的,这个时候该如何选择和计算?举例来说,如果有一个消费TT或kafka的Task,它会同时消费多个partition,每个partition的消费进度不一样,当我们需要获取到当前task的watermark的时候,应该取哪个值?

逻辑上来说,应该是取所有partition中最小的值。因为按照watermark的定义,这个值表示上游的数据中,已经没有比它更小的了。那么看一下Flink是如何做的,在StatusWatermarkValve.inputWatermark方法中:

        if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
            long watermarkMillis = watermark.getTimestamp();
        <span class="hljs-comment">// 如果当前输入的watermark小于该channel的watermark,直接忽略</span>
        <span class="hljs-keyword">if</span> (watermarkMillis &gt; channelStatuses[channelIndex].watermark) {
            channelStatuses[channelIndex].watermark = watermarkMillis;

            <span class="hljs-comment">// 标识当前channel的watermark已经检查过</span>
            <span class="hljs-keyword">if</span> (!channelStatuses[channelIndex].isWatermarkAligned &amp;&amp; watermarkMillis &gt;= lastOutputWatermark) {
                channelStatuses[channelIndex].isWatermarkAligned = <span class="hljs-keyword">true</span>;
            }

            <span class="hljs-comment">// 取所有channel的watermark最小值并调用handleWatermark方法</span>
            findAndOutputNewMinWatermarkAcrossAlignedChannels();
        }
    }</code></pre>

的确也是这么做的。

最后总结一下,Flink的window,很灵活很强大,不过在有的时候还是会有一些问题:

  1. 当key的规模很大,而任务的并发不高的时候,会存大大量的timer对象,会消耗掉不少的内存。
  2. watermark需要用户来定义实现,实现得不好很容易会得出错误的窗口计算结果,这点不太方便。
  3. watermark的计算策略过于单一,目前只能取各channel的最小值,用户无法自定义这一块的逻辑。

原文地址:https://blog.csdn.net/u013411339/article/details/89430362

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的处理数据是固定的离线计算是有延时的,T+1离线计算是数据处理完输出结果,只是输出最终结果离线计算相对可以处理复杂的计算1.2实时计算实时计算是实时的处理数据,数据从流入到计算出结果延迟低实时计算是输
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能:新增工作流新增OceanBaseSQL新增Flinkjar任务数据同步、实时采集支持脏数据管理HiveUDF控制台UI升级租户绑定简化新版本的使用文档已在社区中推送,大家可以随时下载查阅,欢迎大家体验新版本功能
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽。本文主要记录一下Java使用Flink的简单例子。首先,去官网下载Flink的zip包(链接就不提供了,你已经是个成熟的程序员了,该有一定的搜索能力了),解压后放到你想放的地方。进入主目录后,是这样子的 image.png你可以简
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasreproducedhereunderthe ApacheLicense,Version2.0.ThisblogpostprovidesanintroductiontoApacheFlink’sbuilt-inmonitoringandmetricssystem,thatallowsdeveloperstoeffectively
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下的flink-conf.yaml:经过初步的调整,大约有以下模块的参数(未优化)LicensedtotheApacheSoftwareFoundation(ASF)underoneormorecontributorlicenseagreements.SeetheNOTICEfiledistributedwiththis
1.mac平台安装flink(默认最新版)brewinstallapache-flink安装结果:Version1.7.1,commitID:89eafb42.jdk版本,我尝试使用了Java8和Java11,都能兼容3.在flink的安装目录下,启动flink目录一般默认在/usr/local/Cellar/apache-flink/1.7.1/(查找flink安装目录:find/-name
课程目标:学完该课程大家会对Flink有非常深入的了解,同时可以体会到Flink的强大之处,以及可以结合自己公司的业务进行使用,减少自己研究和学习Flink的时间。适合人群:适合有大数据开发基础和flink基础的同学。在开始学习前给大家说下什么是Flink? 1.Flink是一个针对流数据和批数据的
本文主要研究一下flink的NetworkEnvironmentConfigurationNetworkEnvironmentConfigurationflink-1.7.2/flink-runtime/src/main/java/org/apache/flinkuntimeaskmanager/NetworkEnvironmentConfiguration.javapublicclassNetworkEnvironmentCon
January22,2019 UseCases, ApacheFlinkLasseNedergaard   Recentlytherehasbeensignificantdiscussionaboutedgecomputingasamajortechnologytrendin2019.Edgecomputingbrings computingcapabilitiesawayfromthecloud,andrathercloset
1DataStreamAPI1.1DataStreamDataSources   source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。   flink提供了大量的已经实现好的source方法,可以自定义source   通过实现sourceFunction接口来
基于Flink流处理的动态实时亿级全端用户画像系统课程下载:https://pan.baidu.com/s/1YtMs-XG5-PsTFV9_7-AlfA提取码:639m项目中采用到的算法包含LogisticRegression、Kmeans、TF-IDF等,Flink暂时支持的算法比较少,对于以上算法,本课程将手把手带大家用Flink实现,并且结合真实场景,
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些
 flink集群安装部署 standalone集群模式 必须依赖必须的软件JAVA_HOME配置flink安装配置flink启动flink添加Jobmanageraskmanager实例到集群个人真实环境实践安装步骤 必须依赖必须的软件flink运行在所有类unix环境中,例如:linux、mac、或
1Flink的前世今生(生态很重要)很多人可能都是在2015年才听到Flink这个词,其实早在2008年,Flink的前身已经是柏林理工大学一个研究性项目,在2014被Apache孵化器所接受,然后迅速地成为了ASF(ApacheSoftwareFoundation)的顶级项目之一。   ApacheFlinkisanopensource
序本文主要研究一下flink的CsvTableSourceTableSourceflink-table_2.11-1.7.1-sources.jar!/org/apache/flinkable/sources/TableSource.scalatraitTableSource[T]{/**Returnsthe[[TypeInformation]]forthereturntypeoft
原文链接JobManager高可用性(HA)  作业管理器JobManager协调每个Flink部署组件,它负责调度以及资源管理。  默认情况下,每个Flink集群只有一个独立的JobManager实例,因此可能会产生单点故障(SPOF)。  使用JobManagerHighAvailability,可以从JobManager的故障中恢复,从而消除SPOF。
一、背景在flink本地环境安装完成之后,就想着怎么能调试和运行一个flink示例程序,本文记录下过程。二、获取flink源码通过如下命令,获取flink源码,在源码中有flink-examples模块,该模块中包含简单的SocketWindowWordCount.java示例程序。gitclonehttps://github.com/apache/
作为一家创新驱动的科技公司,袋鼠云每年研发投入达数千万,公司80%员工都是技术人员,袋鼠云产品家族包括企业级一站式数据中台PaaS数栈、交互式数据可视化大屏开发平台Easy[V]等产品也在迅速迭代。在进行产品研发的过程中,技术小哥哥们能文能武,不断提升产品性能和体验的同时,也把这些提
在阅读本文之前,请先阅读Flink原理与实现:Window机制,这篇文章从用户的角度,对Window做了比较详细的分析,而本文主要是从Flink框架的实现层面,对Window做另一个角度的分析。首先看一个比较简单的情况,假设我们在一个KeyedStream上做了一个10秒钟的tumblingprocessingtimewindow