[源码解析] Flink UDAF 背后做了什么

[源码解析] Flink UDAF 背后做了什么

0x00 摘要

本文涉及到Flink SQL UDAF,Window 状态管理等部分,希望能起到抛砖引玉的作用,让大家可以借此深入了解这个领域。

0x01 概念

1.1 概念

大家知道,Flink的自定义聚合函数(UDAF)可以将多条记录聚合成1条记录,这功能是通过accumulate方法来完成的,官方参考指出:

在系统运行过程中,底层runtime代码会把历史状态accumulator,和您指定的上游数据(支持任意数量,任意类型的数据)作为参数,一起发送给accumulate计算。

但是实时计算还有一些特殊的场景,在此场景下,还需要提供merge方法才能完成。

在实时计算中一些场景需要merge,例如session window。 由于实时计算具有out of order的特性,后输入的数据有可能位于2个原本分开的session中间,这样就把2个session合为1个session。此时,需要使用merge方法把多个accumulator合为1个accumulator。

1.2 疑问

之前因为没亲身操作,所以一直忽略merge的特殊性。最近无意中看到了一个UDAF的实现,突然觉得有一个地方很奇怪,即 accumulate 和 merge 这两个函数不应该定义在一个类中。因为这是两个完全不同的处理方法。应该定义在两个不同的类中。

比如用UDAF做word count,则:

  • accumulate 是在一个task中累积数字,其实就相当于 map;
  • merge 是把很多task的结果再次累积起来,就相当于 reduce;

然后又想出了一个问题:Flink是如何管理 UDAF的accumulator?其状态存在哪里?

看起来应该是Flink在背后做了一些黑魔法,把这两个函数从一个类中拆分了。为了验证我们的推测,让我们从源码入手来看看这些问题:

  • Flink SQL转换/执行计划生成阶段,如何处理在 "同一个类中" 的不同类型功能函数 accumulate 和 merge?
  • Flink runtime 如何处理 merge?
  • Flink runtime 如何处理 UDAF的accumulator的历史状态?

1.3 UDAF示例代码

示例代码摘要如下 :

public class CountUdaf extends AggregateFunction<Long,CountUdaf.CountAccum> {
    //定义存放count UDAF状态的accumulator的数据的结构。
    public static class CountAccum {
        public long total;
    }
  
    //初始化count UDAF的accumulator。
    public CountAccum createAccumulator() {
        CountAccum acc = new CountAccum();
        acc.total = 0;
        return acc;
    }
  
    //accumulate提供了,如何根据输入的数据,更新count UDAF存放状态的accumulator。
    public void accumulate(CountAccum accumulator,Object iValue) {
        accumulator.total++;
    }

    public void merge(CountAccum accumulator,Iterable<CountAccum> its) {
        for (CountAccum other : its) {
            accumulator.total += other.total;
        }
    }
}

0x02 批处理

批处理相对简单,因为数据是有边界的,其逻辑比较清晰。

2.1 代码

首先给出测试代码

val input = env.fromElements(WC("hello",1),WC("hello",WC("ciao",1))

// register the DataSet as a view "WordCount"
tEnv.createTemporaryView("WordCount",input,'word,'frequency)
tEnv.registerFunction("countUdaf",new CountUdaf())

// run a SQL query on the Table and retrieve the result as a new Table
val table = tEnv.sqlQuery("SELECT word,countUdaf(frequency),SUM(frequency) FROM WordCount GROUP BY word")

case class WC(word: String,frequency: Long)

2.2 计划生成

DataSetAggregate.translateToPlan 中生成了执行计划。原来Flink把 SQL 语句分割成两个阶段:

  • combineGroup
  • reduceGroup

于是我们推断,这很有可能就是 combineGroup 调用accumulate,reduceGroup 调用 merge

关于combineGroup,如果有兴趣,可以看看我之前文章 [源码解析] Flink的groupBy和reduce究竟做了什么 以及 源码解析] GroupReduce,GroupCombine 和 Flink SQL group by

override def translateToPlan(tableEnv: BatchTableEnvImpl,queryConfig: BatchQueryConfig): DataSet[Row] = {
    if (grouping.length > 0) {
      // grouped aggregation

      if (preAgg.isDefined) {
        // 执行到这里
        inputDS
          // pre-aggregation
          .groupBy(grouping: _*)
          .combineGroup(preAgg.get) // 第一阶段
          .returns(preAggType.get)
          .name(aggOpName)
          
          // final aggregation
          .groupBy(grouping.indices: _*)
          .reduceGroup(finalAgg.right.get) // 第二阶段
          .returns(rowTypeInfo)
          .name(aggOpName)
      }
    }
}

SQL语句对应的执行计划大致为:

2.3 执行

在执行看,确实对应了两个阶段。

阶段 1 确实是 GroupReduceCombineDriver 调用到了 accumulate。

//堆栈如下
accumulate:25,CountUdaf (mytest)
accumulate:-1,DataSetAggregatePrepareMapHelper$5
combine:71,DataSetPreAggFunction (org.apache.flink.table.runtime.aggregate)
sortAndCombine:213,GroupReduceCombineDriver (org.apache.flink.runtime.operators)
run:188,GroupReduceCombineDriver (org.apache.flink.runtime.operators)
  
//SQL UDAF生成的代码如下  
function = {DataSetAggregatePrepareMapHelper$5@10085} 
 function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10079} "CountUdaf"
 function_org$apache$flink$table$functions$aggfunctions$LongSumAggFunction$a5214701531789b3139223681d = {LongSumAggFunction@10087} "LongSumAggFunction"  

阶段 2 中 GroupReduceDriver 调用到了 merge

//堆栈如下
merge:29,CountUdaf (mytest)
mergeAccumulatorsPair:-1,DataSetAggregateFinalHelper$6
reduce:71,DataSetFinalAggFunction (org.apache.flink.table.runtime.aggregate)
run:131,GroupReduceDriver (org.apache.flink.runtime.operators)
  
//SQL UDAF生成的代码如下   
function = {DataSetAggregateFinalHelper$6@10245} 
 function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10238} "CountUdaf"
 function_org$apache$flink$table$functions$aggfunctions$LongSumAggFunction$a5214701531789b3139223681d = {LongSumAggFunction@10247} "LongSumAggFunction"  

Flink对用户定义的UDAF代码分别生成了两个不同的功能类

  • DataSetAggregatePrepareMapHelper : 用于Combine阶段,调用了accumulate
  • DataSetAggregateFinalHelper :用于Reduce阶段,调用了merge

2.4 状态管理

UDAF有一个accumulator,这个会在程序运行过程中始终存在,Flink是如何管理这个accumulator呢?

GroupReduceCombineDriver类有一个成员变量 combiner,

public class GroupReduceCombineDriver<IN,OUT> implements Driver<GroupCombineFunction<IN,OUT>,OUT> {
  	private GroupCombineFunction<IN,OUT> combiner;
}

而 combiner 被赋予了 DataSetPreAggFunction 类的一个实例。

class DataSetPreAggFunction(genAggregations: GeneratedAggregationsFunction)
  extends AbstractRichFunction{
  private var accumulators: Row = _ //这里存储历史状态
  private var function: GeneratedAggregations = _
}

Flink就是把 UDAF的accumulator 存储在 combiner.accumulators 中,我们可以看到,无论用户定义了什么类型作为 accumulator,Flink都用万能类型 Row 搞定

combiner = {DataSetPreAggFunction@10063} 
 genAggregations = {GeneratedAggregationsFunction@10070} 
 accumulators = {Row@10117} "mytest.CountUdaf$CountAccum@1e343db7,(0,false)"
 function = {DataSetAggregatePrepareMapHelper$5@10066}  // function是包含用户代码的功能类。
  function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10076} "CountUdaf" 

2.5 总结

让我们总结一下,批处理被分成两个阶段:

  • combineGroup :根据用户UDAF代码生成功能类 DataSetAggregatePrepareMapHelper,用于Combine阶段,调用了accumulate;
  • reduceGroup :根据用户UDAF代码生成功能类 DataSetAggregateFinalHelper,用于Reduce阶段,调用了 merge;

Flink在GroupReduceCombineDriver类的成员变量 combiner 中存储 accumulator历史状态。

0x03 流处理

流处理则是和批处理完全不同的世界,下面我们看看流处理背后有什么奥秘。

在流计算场景中,数据没有边界源源不断的流入的,每条数据流入都可能会触发计算,比如在进行count或sum这些操作是如何计算的呢?

  • 是选择每次触发计算将所有流入的历史数据重新计算一遍?
  • 还是每次计算都基于上次计算结果进行增量计算呢?
  • 如果选择增量计算,那么上一次的中间计算结果保存在哪里?内存?

3.1 示例代码

val query: Table = tableEnv.sqlQuery(
  """
    |SELECT
    |countUdaf(num)
    |FROM tb_num
    |GROUP BY TUMBLE(proctime,INTERVAL '10' SECOND)
   """.stripMargin)

3.2 计划生成

DataStreamGroupWindowAggregateBase.translateToPlan 函数中完成了计划生成。根据Stream的类型(是否有key),会走不同的逻辑业务。

  • WindowedStream代表了根据key分组,并且基于WindowAssigner切分窗口的数据流。所以WindowedStream都是从KeyedStream衍生而来的。在key分组的流上进行窗口切分是比较常用的场景,也能够很好地并行化(不同的key上的窗口聚合可以分配到不同的task去处理)。
  • 当在普通流(没有key)上进行窗口操作时,就要用到 AllWindowedStreamAllWindowedStream是直接在DataStream上进行windowAll(...)操作。在普通流上进行窗口操作,就势必需要将所有分区的流都汇集到单个的Task中,而这个单个的Task很显然就会成为整个Job的瓶颈。

我们的示例代码是基于Key的,所以走 WindowedStream 分支,即一个 window 中即做accumulate,又做merge

// grouped / keyed aggregation
if (grouping.length > 0) {
      // 有key,所以是 WindowedStream,我们示例走这里
      val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(...)

      val keySelector = new CRowKeySelector(grouping,inputSchema.projectedTypeInfo(grouping))
      val keyedStream = timestampedInput.keyBy(keySelector)
      val windowedStream =
        createKeyedWindowedStream(queryConfig,window,keyedStream)
          .asInstanceOf[WindowedStream[CRow,Row,DataStreamWindow]]

      val (aggFunction,accumulatorRowType) =
        AggregateUtil.createDataStreamGroupWindowAggregateFunction(...)

      windowedStream
        .aggregate(aggFunction,windowFunction,accumulatorRowType,outRowType)
        .name(keyedAggOpName)
}
// global / non-keyed aggregation
else {
      // 没有key,所以是AllWindowedStream 
      val windowFunction = AggregateUtil.createAggregationAllWindowFunction(...)

      val windowedStream =
        createNonKeyedWindowedStream(queryConfig,timestampedInput)
          .asInstanceOf[AllWindowedStream[CRow,outRowType)
        .name(nonKeyedAggOpName)
}

SQL语句对应的执行计划大致如下,我们能看出来 accumulate & merge 都在 Window 中处理。

3.3 执行 & 状态管理

可以看到,流处理对UDAF的管理,就完全是进入了Window的地盘,而UDAF历史状态管理其实就是Flink Window状态管理的领域了。

我们以基于key的WindowedStream为例继续进行研究。

3.3.1 接受到一个新输入

当Window接受到一个输入item时候,item会被分配到一个key,由KeySelector完成。WindowOperator 类首先使用用户选择的 windowAssigner 将流入的数据分配到响应的window中,有可能是1个,0个甚至多个window。这里就会做accumulate

本例 windowAssigner = {TumblingProcessingTimeWindows} ,进入到processElement函数的 非 MergingWindow部分,具体流程如下:

  • 遍历elementWindows,进行业务处理
    • 1)判断该window是否已过期,isWindowLate(window)
    • 2)获取该window的context,windowState.setCurrentNamespace(window); 这里是 HeapAggregatingState。
    • 3)将数据加入,windowState.add(element.getValue());
      • 3.1)调用 stateTable.transform();处理输入
        • 3.1.1)StateMap<K,N,S> stateMap = getMapForKeyGroup(keyGroup); 这里获取到CopyOnWriteStateMap
        • 3.1.2)stateMap.transform(key,namespace,value,transformation);
          • 3.1.2.1)调用 AggregateTransformation.apply,其又调用 aggFunction.add(value,accumulator);
            • 3.1.2.1.1)调用 GroupingWindowAggregateHelper.accumulate(accumulatorRow,value.row),其又调用 用户定义的 accumulate

可以看到,是 windowState 添加元素时候,调用到State的API,然后间接调用到了UDAF

3.3.2 windowState & UDAF执行

windowState 以 window 为 namespace,以隔离不同的window的context。这里虽然叫做 windowState 。但是可以发现,该类存储的是不同window中的对应的原始数据(processWindowFunction情况)或结果(ReduceFunction/AggregateFunction情况)。我们此例中,存储的是执行结果

本例用到的 window process 是 Incremental Aggregation Functions。即 ReduceFunction 与 AggregateFunction ,其特点是无需保存 window 中的所有数据,一旦新数据进入,便可与之前的中间结果进行计算,因此这种 window 中其状态仅需保存一个结果便可。

因此这里我们拿到的是 HeapReducingState, HeapAggregatingState,当执行到 windowState.add(element.getValue());语句时,便调用UDAF得出结果

3.3.3 State & 结果存储

在flink中state用来存放计算过程的节点中间结果或元数据。在flink内部提供三种state存储实现

  • 内存HeapStateBackend:存放数据量小,用于开发测试使用;生产不建议使用
  • HDFS的FsStateBackend :分布式文件持久化,每次都会产生网络io,可用于大state,不支持增量;可用于生产
  • RocksDB的RocksDBStateBackend:本地文件 + 异步hdfs持久化,也可用于大state数据量,唯一支持增量,可用于生产;

我们这里拿到的是 HeapAggregatingState

3.3.4 State 存储结构

以三元组的形式存储保存数据,即 key,value

public abstract class StateTable<K,S>
	implements StateSnapshotRestore,Iterable<StateEntry<K,S>> {
   /**
   * Map for holding the actual state objects. The outer array represents the key-groups.
   * All array positions will be initialized with an empty state map.
   */
	protected final StateMap<K,S>[] keyGroupedStateMaps;
}

// 真实中变量摘录如下
keyGroupedStateMaps = {StateMap[1]@9266} 
 0 = {CopyOnWriteStateMap@9262} // 这里就是将要保存用户accumulator的地方
  stateSerializer = {RowSerializer@9254} 
  snapshotVersions = {TreeSet@9277}  size = 0
  primaryTable = {CopyOnWriteStateMap$StateMapEntry[128]@9278} 
  incrementalRehashTable = {CopyOnWriteStateMap$StateMapEntry[2]@9280} 
  lastNamespace = {TimeWindow@9239} "TimeWindow{start=1593934200000,end=1593934210000}"

在上面提及的 3.1.2)stateMap.transform(key,transformation);

@Override
public <T> void transform(
   K key,N namespace,T value,StateTransformationFunction<S,T> transformation) throws Exception {

   final StateMapEntry<K,S> entry = putEntry(key,namespace);

   // copy-on-write check for state
   entry.state = transformation.apply(
      (entry.stateVersion < highestRequiredSnapshotVersion) ?
         getStateSerializer().copy(entry.state) : entry.state,value); 
   // 当执行完用户代码之后,数据会存储在这里,这个就是CopyOnWriteStateMap的一个Entry
   entry.stateVersion = stateMapVersion;

3.4 总结

流处理对UDAF的管理,就完全是进入了Window的地盘,而UDAF历史状态管理其实就是Flink Window状态管理的领域了。

  • window接受到新输入,就会往 windowState 添加元素。
  • windowState 添加元素时候,调用到State的API,然后间接调用到了UDAF
  • windowState 在本例存储的是UDAF执行结果。具体存储是在HeapAggregatingState中完成。

0xFF 参考

Flink - 当数据流入window时,会发生什么

Flink SQL 自定义UDAF

自定义聚合函数(UDAF)

Apache Flink - 常见数据流类型

Flink-SQL源码解读(一)window算子的创建的源码分析

从udaf谈flink的state

Apache Flink - 常见数据流类型

Flink状态管理(二)状态数据结构和注册流程

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。