[源码分析]从"UDF不应有状态" 切入来剖析Flink SQL代码生成 (修订版)

[源码分析]从"UDF不应有状态" 切入来剖析Flink SQL代码生成 (修订版)

0x00 摘要

"Flink SQL UDF不应有状态" 这个技术细节可能有些朋友已经知道了。但是为什么不应该有状态呢?这个恐怕大家就不甚清楚了。本文就带你一起从这个问题点入手,看看Flink SQL究竟是怎么处理UDF,怎么生成对应的SQL代码。

0x01 概述结论

先说结论,后续一步步给大家详述问题过程。

1. 问题结论

结论是:Flink内部对SQL生成了java代码,但是这些java代码针对SQL做了优化,导致在某种情况下,可能 会对 "在SQL中本应只调用一次" 的UDF 重复调用

  • 我们在写SQL时候,经常会在SQL中只写一次UDF,我们认为运行时候也应该只调用一次UDF。
  • 对于SQL,Flink是内部解析处理之后,把SQL语句转化为Flink原生算子来处理。大家可以认为是把SQL翻译成了java代码再执行,这些代码针对 SQL做了优化。
  • 对于UDF,Flink也是内部生成java代码来处理,这些代码也针对SQL做了优化。
  • 在Flink内部生成的这些代码中,Flink会在某些特定情况下,对 "在SQL中本应只调用一次" 的UDF 重复调用
  • Flink生成的内部代码,是把"投影运算"和"过滤条件"分别生成,然后拼接在一起。优化后的"投影运算"和"过滤条件"分别调用了UDF,所以拼接之后就会有多个UDF调用。
  • 因为实际上编写时候的一次UDF,优化后可能调用了多次,所以UDF内部就不应该有状态信息。

比如:

1. myFrequency 这个字段是由 UDF_FRENQUENCY 这个UDF函数 在本步骤生成。

SELECT word,UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount

2. 按说下面SQL语句就应该直接取出 myFrequency 即可。因为 myFrequency 已经存在了。

SELECT word,myFrequency FROM TableFrequency WHERE myFrequency <> 0

但是因为Flink做了一些优化,把 第一个SQL中 UDF_FRENQUENCY 的计算下推到了 第二个SQL。

3. 优化后实际就变成了类似这样的SQL。

SELECT word,UDF_FRENQUENCY(frequency) FROM tableFrequency WHERE UDF_FRENQUENCY(frequency) <> 0

4. 所以UDF_FRENQUENCY就被执行了两次:在WHERE中执行了一次,在SELECT中又执行了一次。

Flink针对UDF所生成的Java代码 简化转义 版如下,能看出来调用了两次:

  // 原始 SQL "SELECT word,myFrequency FROM TableFrequency WHERE myFrequency <> 0"

    java.lang.Long result$12 = UDF_FRENQUENCY(frequency); // 这次 UDF 调用对应 WHERE myFrequency <> 0
    
    if (result$12 != 0) { // 这里说明 myFrequency <> 0,于是可以进行 SELECT
      
      // 这里对应的是 SELECT myFrequency,注意的是,按我们一般的逻辑,应该直接复用result$12,但是这里又调用了 UDF,重新计算了一遍。所以 UDF 才不应该有状态信息。
	    java.lang.Long result$9 = UDF_FRENQUENCY(frequency);  

	    long select;
      
	    if (result$9 == null) {
	      select = -1L;
	    }
	    else {
	      select = result$9; // 这里最终 SELECT 了 myFrequency
	    }
    }

2. 问题流程

实际上就是Flink生成SQL代码的流程,其中涉及到几个重要的节点举例如下:

关于具体SQL流程,请参见我之前的文章:[源码分析] 带你梳理 Flink SQL / Table API内部执行流程

// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
* 
*        +-----> "SELECT xxxxx WHERE UDF_FRENQUENCY(frequency) <> 0" // (SQL statement)
*        |    
*        |     
*        +-----> LogicalFilter (RelNode) // Abstract Syntax Tree,未优化的RelNode   
*        |      
*        |     
*    FilterToCalcRule (RelOptRule) // Calcite优化rule     
*        | 
*        |   
*        +-----> LogicalCalc (RelNode)  // Optimized Logical Plan,逻辑执行计划
*        |  
*        |    
*    DataSetCalcRule (RelOptRule) // Flink定制的优化rule,转化为物理执行计划
*        |       
*        |   
*        +-----> DataSetCalc (FlinkRelNode) // Physical RelNode,物理执行计划
*        |      
*        |     
*    DataSetCalc.translateToPlanInternal  // 作用是生成Flink算子  
*        |     
*        |     
*        +-----> FlatMapRunner (Operator) // In Flink Task   
*        |     
*        |    

这里的几个关键点是:

  • "WHERE UDF_FRENQUENCY(frequency) <> 0" 这部分SQL对应Calcite的逻辑算子是 LogicalFilter
  • LogicalFilter被转换为LogicalCalc,经过思考我们可以知道,Filter的Condition条件是需要进行计算才能获得的,所以需要转换为Calc
  • DataSetCalc中会生成SQL对应的JAVA代码,这个java类是:DataSetCalcRule extends RichFlatMapFunction。这点很有意思,Flink认为第二条SQL是一个Flatmap操作
  • 为什么UDF对应的第二条SQL是一个Flatmap操作。因为UDF的输入实际是一个数据库记录Record,这很像集合;输出的是数目不等的几部分。这恰恰是Flatmap的思想所在

关于FlatMap,请参见我之前的文章:[源码分析] 从FlatMap用法到Flink的内部实现

我们后文中主要就是排查SQL生成流程中哪里出现了这个"UDF多次调用的问题点"

0x02 UDX

1. UDX (自定义函数)

Flink实时计算支持以下3类自定义函数

UDX分类 描述
UDF(User Defined Function) 用户自定义标量值函数(User Defined Scalar Function)。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
UDAF(User Defined Aggregation Function) 自定义聚合函数,其输入与输出是多对一的关系, 即将多条输入记录聚合成一条输出值。可以与SQL中的GROUP BY语句一起使用。
UDTF(User Defined Table-valued Function) 自定义表值函数,调用一次函数输出多行或多列数据。

2. 自定义标量函数 Scalar Functions (UDF)

用户定义的标量函数(UDF)将0个、1个或多个标量值映射到一个新的标量值。

实现一个标量函数需要继承ScalarFunction,并且实现一个或者多个evaluation方法。标量函数的行为就是通过evaluation方法来实现的。evaluation方法必须定义为public,命名为eval。evaluation方法的输入参数类型和返回值类型决定着标量函数的输入参数类型和返回值类型。

另外 UDF 也有open方法和close方法可选。我们稍后会提到。

3. 自定义聚合函数(UDAF)

自定义聚合函数(UDAF)将多条记录聚合成1条记录。

聚合函数需要继承AggregateFunction。聚合函数工作方式如下:

  • 首先,需要一个accumulator,这个是保存聚合中间结果的数据结构。调用AggregateFunction函数的createAccumulator()方法来创建一个空accumulator.
  • 随后,每个输入行都会调用accumulate()方法来更新accumulator。一旦所有的行被处理了,getValue()方法就会被调用,计算和返回最终的结果。

createAccumulator、getValue 和 accumulate3个方法一起使用,就能设计出一个最基本的UDAF。但是实时计算一些特殊的场景需要您提供retract和merge两个方法才能完成。

4. 自定义表值函数(UDTF)

自定义表值函数(UDTF)与自定义的标量函数类似,自定义的表值函数(UDTF)将0个、1个或多个标量值作为输入参数(可以是变长参数)。与标量函数不同,表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。

为了自定义表函数,需要继承TableFunction,实现一个或者多个evaluation方法。表函数的行为定义在这些evaluation方法内部,函数名为eval并且必须是public。

UDTF可以通过多次调用collect()实现将1行的数据转为多行返回。

UDTF不仅可以做到1行转多行,还可以1列转多列。如果您需要UDTF返回多列,只需要将返回值声明成Tuple或Row。

5. RichFunction

RichFunction是Flink提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

这里专门提到RichFunction,是因为Flink是把UDF做为RichFunction的一部分来实现,即UDF就是RichFunction的成员变量function。所以open,close这两个函数就是在RichFunction的相关同名函数中被调用,而eval函数在RichFunction的业务函数中被调用,比如下文中的function.flatMap就是调用了 UDF.eval:

  override def flatMap(in: Row,out: Collector[Row]): Unit =
    function.flatMap(in,out) 

没有相关经验的同学应该可以深入了解RichFunction用法。

0x03 实例代码

以下是我们的示例程序,后续就讲解这个程序的生成代码。

1. UDF函数

这里只实现了eval函数,没有实现open,close。

import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class myUdf extends ScalarFunction {
    private Long current = 0L;
    private static final Logger LOGGER = LoggerFactory.getLogger(myUdf.class);
    public Long eval(Long a) throws Exception {
        if(current == 0L) {
            current = a;
        } else  {
            current += 1;
        }
        LOGGER.error("The current is : " + current );
        return current;
    }
}

2. 测试代码

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._

object TestUdf {

  def main(args: Array[String]): Unit = {

    // set up execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = BatchTableEnvironment.create(env)

    val input = env.fromElements(WC("hello",1),WC("hello",WC("ciao",1))

    tEnv.registerFunction("UDF_FRENQUENCY",new myUdf())

    // register the DataSet as a view "WordCount"
    tEnv.createTemporaryView("TableWordCount",input,'word,'frequency)

    val tableFrequency = tEnv.sqlQuery("SELECT word,UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount")
    tEnv.registerTable("TableFrequency",tableFrequency)

    // run a SQL query on the Table and retrieve the result as a new Table
    val table = tEnv.sqlQuery("SELECT word,myFrequency FROM TableFrequency WHERE myFrequency <> 0")

    table.toDataSet[WC].print()
  }

  case class WC(word: String,frequency: Long)
}

3. 输出结果

// 输出如下,能看到本来应该是调用三次,结果现在调用了六次

11:15:05,409 ERROR mytestpackage.myUdf                - The current is : 1
11:15:05,409 ERROR mytestpackage.myUdf                - The current is : 2
11:15:05,425 ERROR mytestpackage.myUdf                - The current is : 3
11:15:05,425 ERROR mytestpackage.myUdf                - The current is : 4
11:15:05,426 ERROR mytestpackage.myUdf                - The current is : 5
11:15:05,426 ERROR mytestpackage.myUdf                - The current is : 6

1. 注册UDF

实例中,我们使用了registerFunction函数,将UDF注册到了TableEnvironment之中。

    tEnv.registerFunction("UDF_FRENQUENCY",new myUdf())

TableEnvironment

TableEnvironment 是Table API和SQL集成的核心概念,它主要负责:

  • 在内部目录Catalog中注册一个Table,TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。
  • 注册一个外部目录Catalog
  • 执行SQL查询
  • 注册一个用户自定义函数(标量、表及聚合)
  • 将DataStream或者DataSet转换成Table
  • 持有ExecutionEnvironment或者StreamExecutionEnvironment的引用

FunctionCatalog

在Flink中,Catalog是目录概念,即所有对数据库和表的元数据信息都存放再Flink CataLog内部目录结构中,其存放了flink内部所有与Table相关的元数据信息,包括表结构信息/数据源信息等。

所有UDF都是注册在TableEnvImpl.functionCatalog 这个成员变量之中。这是专门存储 "Table API/SQL函数定义" 的函数目录 (Simple function catalog)。

FunctionCatalog类具有如下两个成员变量,都是LinkedHashMap。

// FunctionCatalog,Table API/SQL function catalog
public class FunctionCatalog implements FunctionLookup {
	private final Map<String,FunctionDefinition> tempSystemFunctions = new LinkedHashMap<>();
	private final Map<ObjectIdentifier,FunctionDefinition> tempCatalogFunctions = new LinkedHashMap<>();
}

tempCatalogFunctions:对应着SQL语句中的 "CREATE FUNCTION "功能,即Function DDL语法。其主要应用场景如下:

  • 从classpath加载UDF
CREATE TEMPORARY FUNCTION catalog1.db1.func1 AS ‘com.xxx.udf.func1UDF’ LANGUAGE ’JVM’
DROP FUNCTION catalog1.db1.geofence
  • 从远程资源加载UDF
CREATE FUNCTION catalog1.db1.func2 AS ‘com.xxx.udf.func2UDF’ LANGUAGE JVM USING ‘http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar’
  • 从远程资源加载python UDF
CREATE FUNCTION catalog1.db1.func3 AS ‘com.xxx.udf.func3UDF’ LANGUAGE ‘PYTHON’ USING ‘http://external.resources/flink-udf.py’

tempSystemFunctions :存储UDX函数,就是本文所要阐述的内容。

经过本阶段之后,myUdf 这个UDX函数,就做为 "UDF_FRENQUENCY" 注册到了系统中,可以在后续的SQL中进行调用操作

2. LogicalFilter

此时,Flink已经完成了如下操作:

  • SQL 解析阶段,生成AST(抽象语法树)(SQL–>SqlNode)
  • SqlNode 验证(SqlNode–>SqlNode)
  • 语义分析,生成逻辑计划(Logical Plan)(SqlNode–>RelNode/RexNode)

Flink将RelNode串成了一个链,具体是由类实例的input完成这个串联任务,即input指向本实例的上游输入。

LogicalFilter的 input 是 LogicalProject,LogicalProject 的 input 是FlinkLogicalDataSetScan。而FlinkLogicalDataSetScan 的table中就可以知道具体输入表的信息。

这个RelNode链具体如下。

== Abstract Syntax Tree ==
LogicalProject(word=[$0],myFrequency=[$1])
  LogicalFilter(condition=[<>($1,0)])
    LogicalProject(word=[$0],myFrequency=[UDF_FRENQUENCY($1)])
      FlinkLogicalDataSetScan(ref=[1976870927],fields=[word,frequency])
  
每一部分都是由 input 指向完成的。

这里的重点是 " myFrequency <> 0" 被转换为 LogicalFilter。这倒是容易理解,因为 WHERE 子句实际就是用来过滤的,所以转换为 LogicalFilter合情合理。

另外需要注意的是:在构建RelNode链的时候 ,Flink已经从TableEnvImpl.functionCatalog 这个成员变量之中提取到了之前注册的myUdf 这个UDF函数实例。当需要获取UDF实例时候,calcite会在 SqlOperatorTable table 中寻找UDF,进而就调用到了FunctionCatalog.lookupFunction这里,从LinkedHashMap中取得实例。

具体是SqlToRelConverter函数中会将SQL语句转换为RelNode,在SqlToRelConverter (org.apache.calcite.sql2rel)完成,其打印内容摘要如下:

filter = {LogicalFilter@4814} "LogicalFilter#2"
 variablesSet = {RegularImmutableSet@4772}  size = 0
 condition = {RexCall@4771} "<>($1,0)"
 input = {LogicalProject@4770} "LogicalProject#1"
  exps = {RegularImmutableList@4821}  size = 2
  input = {FlinkLogicalDataSetScan@4822} "FlinkLogicalDataSetScan#0"
   cluster = {RelOptCluster@4815} 
   catalog = {CatalogReader@4826} 
   dataSet = {DataSource@4827} 
   fieldIdxs = {int[2]@4828} 
   schema = {RelRecordType@4829} "RecordType(VARCHAR(65536) word,BIGINT frequency)"
   table = {RelOptTableImpl@4830} 
    schema = {CatalogReader@4826} 
    rowType = {RelRecordType@4829} "RecordType(VARCHAR(65536) word,BIGINT frequency)"
展开查看调用栈

create:107,LogicalFilter (org.apache.calcite.rel.logical)
createFilter:333,RelFactories$FilterFactoryImpl (org.apache.calcite.rel.core)
convertWhere:993,SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:649,SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:627,SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3181,SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:563,SqlToRelConverter (org.apache.calcite.sql2rel)
rel:150,FlinkPlannerImpl (org.apache.flink.table.calcite)
rel:135,FlinkPlannerImpl (org.apache.flink.table.calcite)
toQueryOperation:490,SqlToOperationConverter (org.apache.flink.table.sqlexec)
convertSqlQuery:315,SqlToOperationConverter (org.apache.flink.table.sqlexec)
convert:155,SqlToOperationConverter (org.apache.flink.table.sqlexec)
parse:66,ParserImpl (org.apache.flink.table.planner)
sqlQuery:457,TableEnvImpl (org.apache.flink.table.api.internal)
main:55,TestUdf$ (mytestpackage)
main:-1,TestUdf (mytestpackage)

3. FilterToCalcRule

下面是优化部分。优化规则分为两类,一类是Calcite提供的内置优化规则(如条件下推,剪枝等),另一类是是将Logical Node转变成 Flink Node 的规则。

这里Flink发现了FilterToCalcRule 这个rule适合对Filter进行切换。

我们思考下可知,Filter的Condition条件是需要进行计算才能获得的,所以需要转换为Calc

具体源码在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)

call = {VolcanoRuleMatch@5576} "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition=<>($1,0))]"
 targetSet = {RelSet@5581} 
 targetSubset = null
 digest = "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,0))]"
 cachedImportance = 0.891
 volcanoPlanner = {VolcanoPlanner@5526} 
 generatedRelList = null
 id = 45
 operand0 = {RelOptRuleOperand@5579} 
 nodeInputs = {RegularImmutableBiMap@5530}  size = 0
 rule = {FilterToCalcRule@5575} "FilterToCalcRule"
 rels = {RelNode[1]@5582} 
 planner = {VolcanoPlanner@5526} 
 parents = null
展开查看调用栈

onMatch:65,FilterToCalcRule (org.apache.calcite.rel.rules)
onMatch:208,VolcanoRuleCall (org.apache.calcite.plan.volcano)
findBestExp:631,VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327,Programs$RuleSetProgram (org.apache.calcite.tools)
runVolcanoPlanner:280,Optimizer (org.apache.flink.table.plan)
optimizeLogicalPlan:199,Optimizer (org.apache.flink.table.plan)
optimize:56,BatchOptimizer (org.apache.flink.table.plan)
translate:280,BatchTableEnvImpl (org.apache.flink.table.api.internal)
toDataSet:69,BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toDataSet:53,TableConversions (org.apache.flink.table.api.scala)
main:57,TestUdf (mytestpackage)

4. LogicalCalc

因为上述的FilterToCalcRule,所以生成了 LogicalCalc。我们也可以看到这里就是包含了UDF_FRENQUENCY

calc = {LogicalCalc@5632} "LogicalCalc#60"
 program = {RexProgram@5631} "(expr#0..1=[{inputs}],expr#2=[UDF_FRENQUENCY($t1)],expr#3=[0:BIGINT],expr#4=[<>($t2,$t3)],proj#0..1=[{exprs}],$condition=[$t4])"
 input = {RelSubset@5605} "rel#32:Subset#0.LOGICAL"
 desc = "LogicalCalc#60"
 rowType = {RelRecordType@5629} "RecordType(VARCHAR(65536) word,BIGINT frequency)"
 digest = "LogicalCalc#60"
 cluster = {RelOptCluster@5596} 
 id = 60
 traitSet = {RelTraitSet@5597}  size = 1

5. DataSetCalc

经过转换,最后得到了physical RelNode,即物理 RelNode 执行计划 DataSetCalc。

== Optimized Logical Plan ==
DataSetCalc(select=[word,UDF_FRENQUENCY(frequency) AS myFrequency],where=[<>(UDF_FRENQUENCY(frequency),0:BIGINT)])
  DataSetScan(ref=[1976870927],frequency])

具体源码在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)。

// 这里给出了执行函数,运行内容和调用栈
  
ConverterRule.onMatch(RelOptRuleCall call) {
        RelNode rel = call.rel(0);
        if (rel.getTraitSet().contains(this.inTrait)) {
            RelNode converted = this.convert(rel);
            if (converted != null) {
                call.transformTo(converted);
            }
        }
}

// 转换后的 DataSetCalc 内容如下

converted = {DataSetCalc@5560} "Calc(where: (<>(UDF_FRENQUENCY(frequency),0:BIGINT)),select: (word,UDF_FRENQUENCY(frequency) AS myFrequency))"
 cluster = {RelOptCluster@5562} 
 rowRelDataType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word,BIGINT myFrequency)"
 calcProgram = {RexProgram@5566} "(expr#0..1=[{inputs}],word=[$t0],myFrequency=[$t2],$condition=[$t4])"
 ruleDescription = "DataSetCalcRule"
 program = {RexProgram@5566} "(expr#0..1=[{inputs}],$condition=[$t4])"
 input = {RelSubset@5564} "rel#71:Subset#5.DATASET"
 desc = "DataSetCalc#72"
 rowType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word,BIGINT myFrequency)"
 digest = "DataSetCalc#72"
 AbstractRelNode.cluster = {RelOptCluster@5562} 
 id = 72
 traitSet = {RelTraitSet@5563}  size = 1
展开查看调用栈

init:52,DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
convert:40,DataSetCalcRule (org.apache.flink.table.plan.rules.dataSet)
onMatch:144,ConverterRule (org.apache.calcite.rel.convert)
onMatch:208,Optimizer (org.apache.flink.table.plan)
optimizePhysicalPlan:209,Optimizer (org.apache.flink.table.plan)
optimize:57,TestUdf (mytestpackage)

6. generateFunction (问题点所在)

在DataSetCalc中,会最后生成UDF对应的JAVA代码。

class DataSetCalc {
  
  override def translateToPlan(
      tableEnv: BatchTableEnvImpl,queryConfig: BatchQueryConfig): DataSet[Row] = {

    ......
    
    // 这里生成了UDF对应的JAVA代码
    val genFunction = generateFunction(
      generator,ruleDescription,new RowSchema(getRowType),projection,condition,config,classOf[FlatMapFunction[Row,Row]])

    // 这里生成了FlatMapRunner
    val runner = new FlatMapRunner(genFunction.name,genFunction.code,returnType)

    inputDS.flatMap(runner).name(calcOpName(calcProgram,getExpressionString))
  }  
}
展开查看调用栈

translateToPlan:90,DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
translate:306,BatchTableEnvImpl (org.apache.flink.table.api.internal)
translate:281,TestUdf (mytestpackage)

真正生成代码的位置如下,能看出来生成代码是FlatMapFunction。而本文的问题点就出现在这里

具体原因从下面代码的注释中能够看出:针对本示例代码,最后是生成了

  • 投射内容,就是 SELECT。filterCondition实际上已经生成包含了调用UDF的代码
  • 过滤条件,就是 WHERE。projection实际上已经生成包含了调用UDF的代码
  • 生成类的部分代码,这里对应的是UDF的业务内容,这里就是简单的把“投射内容”和“过滤条件”拼接在一起,并没有做优化,所以就形成了两个UDF调用。
// 下面能看出,针对不同的SQL子句,Flink会进行不同的转化

trait CommonCalc {

  private[flink] def generateFunction[T <: Function](
      generator: FunctionCodeGenerator,ruleDescription: String,returnSchema: RowSchema,calcProjection: Seq[RexNode],calcCondition: Option[RexNode],config: TableConfig,functionClass: Class[T]):
    GeneratedFunction[T,Row] = {

    // 生成投射内容,就是 SELECT。filterCondition实际上已经生成包含了调用UDF的代码,下面会给出其内容
    val projection = generator.generateResultExpression(
      returnSchema.typeInfo,returnSchema.fieldNames,calcProjection)

    // only projection
    val body = if (calcCondition.isEmpty) {
      s"""
        |${projection.code}
        |${generator.collectorTerm}.collect(${projection.resultTerm});
        |""".stripMargin
    }
    else {
      // 生成过滤条件,就是 WHERE。filterCondition实际上已经生成包含了调用UDF的代码,下面会给出其内容
      val filterCondition = generator.generateExpression(calcCondition.get)
        
      // only filter
      if (projection == null) {
        s"""
          |${filterCondition.code}
          |if (${filterCondition.resultTerm}) {
          |  ${generator.collectorTerm}.collect(${generator.input1Term});
          |}
          |""".stripMargin
      }
      // both filter and projection
      else {
        // 本例中,会进入到这里。把 filterCondition 和 projection 代码拼接起来。这下子就有了两个 UDF 的调用。
        s"""
          |${filterCondition.code}
          |if (${filterCondition.resultTerm}) {
          |  ${projection.code}
          |  ${generator.collectorTerm}.collect(${projection.resultTerm});
          |}
          |""".stripMargin
      }
    }

    // body 是filterCondition 和 projection 代码的拼接,分别都有 UDF 的调用,现在就有了两个UDF调用了,也就是我们问题所在。
    generator.generateFunction(
      ruleDescription,functionClass,body,returnSchema.typeInfo)
  }
}

// 此函数输入中,calcCondition就是我们SQL的过滤条件

calcCondition = {Some@5663} "Some(<>(UDF_FRENQUENCY($1),0))"

// 此函数输入中,calcProjection就是我们SQL的投影运算条件
  
calcProjection = {ArrayBuffer@5662} "ArrayBuffer" size = 2
 0 = {RexInputRef@7344} "$0"
 1 = {RexCall@7345} "UDF_FRENQUENCY($1)"
  
// 生成过滤条件,就是 WHERE 对应的代码。filterCondition实际上已经生成包含了调用UDF的代码
  
filterCondition = {GeneratedExpression@5749} "GeneratedExpression(result$16,isNull$17,\n\n\n\njava.lang.Long result$12 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n  isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$14 = result$12 == null;\nlong result$13;\nif (isNull$14) {\n  result$13 = -1L;\n}\nelse {\n  result$13 = result$12;\n}\n\n\n\nlong result$15 = 0L;\n\nboolean isNull$17 = isNull$14 || false;\nboolean result$16;\nif (isNull$17) {\n  result$16 = false;\n}\nelse {\n  result$16 = result$13 != result$15;\n}\n,Boolean,false)"
    
// 生成投影运算,就是 SELECT 对应的代码。projection也包含了调用UDF的代码  
  
projection = {GeneratedExpression@5738} "GeneratedExpression(out,false,\n\nif (isNull$6) {\n  out.setField(0,null);\n}\nelse {\n  out.setField(0,result$5);\n}\n\n\n\n\n\njava.lang.Long result$9 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n  isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$11 = result$9 == null;\nlong result$10;\nif (isNull$11) {\n  result$10 = -1L;\n}\nelse {\n  result$10 = result$9;\n}\n\n\nif (isNull$11) {\n  out.setField(1,null);\n}\nelse {\n  out.setField(1,result$10);\n}\n,Row(word: String,myFrequency: Long),false)"
  
// 具体这个类其实是 DataSetCalcRule extends RichFlatMapFunction 
name = "DataSetCalcRule"
  
// 生成的类  
clazz = {Class@5773} "interface org.apache.flink.api.common.functions.FlatMapFunction"
  
// 生成类的部分代码,这里对应的是UDF的业务内容
bodyCode = "\n\n\n\n\njava.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n  isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$14 = result$12 == null;\nlong result$13;\nif (isNull$14) {\n  result$13 = -1L;\n}\nelse {\n  result$13 = result$12;\n}\n\n\n\nlong result$15 = 0L;\n\nboolean isNull$17 = isNull$14 || false;\nboolean result$16;\nif (isNull$17) {\n  result$16 = false;\n}\nelse {\n  result$16 = result$13 != result$15;\n}\n\nif (result$16) {\n  \n\nif (isNull$6) {\n  out.setField(0,result$5);\n}\n\n\n\n\n\njava.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n  isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$11 = result$9 == null;\nlong result$10;\nif (isNull$11) {\n  result$10 = -1L;\n}\nelse {\n  result$10 = result$9;\n}\n\n\nif (isNull$11) {\n  out.setField(1,result$10);\n}\n\n  c.collect(out);\n}\n"
展开查看调用栈

generateFunction:94,FunctionCodeGenerator (org.apache.flink.table.codegen)
generateFunction:79,CommonCalc$class (org.apache.flink.table.plan.nodes)
generateFunction:45,DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
translateToPlan:105,TestUdf (mytestpackage)

7. FlatMapRunner

最后还要重点说明下Flink对于SQL代码最后的转换包装。

前面提到了,Flink把UDF做为RichFunction的一部分来实现。事实上,Flink是把SQL整条语句转化为一个RichFunction。示例中的两条SQL语句,分别转换为 RichMapFunction 和 RichFlatMapFunction。具体从下面物理执行计划中可以看出。

== Physical Execution Plan ==
Stage 3 : Data Source
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 2 : Map
		content : from: (word,frequency)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 1 : FlatMap
			content : where: (<>(UDF_FRENQUENCY(frequency),UDF_FRENQUENCY(frequency) AS myFrequency)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : FlatMap
			Partitioning : RANDOM_PARTITIONED

			Stage 0 : Data Sink
				content : org.apache.flink.api.java.io.DiscardingOutputFormat
				ship_strategy : Forward
				exchange_mode : PIPELINED
				Partitioning : RANDOM_PARTITIONED

我们在org.apache.flink.table.runtime目录下,可以看到Flink针对每一种 physical RelNode,都定义了一种RichFunction,摘录如下:

CRowCorrelateProcessRunner.scala        FlatMapRunner.scala
CRowMapRunner.scala                     MapJoinLeftRunner.scala
CRowOutputProcessRunner.scala           MapJoinRightRunner.scala
CRowProcessRunner.scala                 MapRunner.scala
CorrelateFlatMapRunner.scala            MapSideJoinRunner.scala
FlatJoinRunner.scala

实例中第二条SQL语句其类别就是 DataSetCalcRule extends RichFlatMapFunction。从定义能够看出来,FlatMapRunner继承了RichFlatMapFunction,说明 Flink认为本条SQL就是一个Flatmap操作

package org.apache.flink.table.runtime

class FlatMapRunner(
    name: String,code: String,@transient var returnType: TypeInformation[Row])
  extends RichFlatMapFunction[Row,Row] ... {

  private var function: FlatMapFunction[Row,Row] = _

  ...

  override def flatMap(in: Row,out)

  ...
}

0x05 UDF生成的代码

1. 缩减版

这里是生成的代码缩减版,能看出具体问题点,myUdf函数被执行了两次。

function_mytestpackage\(myUdf\)c45b0e23278f15e8f7d075abac9a121b 这个就是 myUdf 转换之后的函数。

  // 原始 SQL "SELECT word,myFrequency FROM TableFrequency WHERE myFrequency <> 0"
 
    java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
      isNull$8 ? null : (java.lang.Long) result$7); // 这次 UDF 调用对应 WHERE myFrequency <> 0

    boolean isNull$14 = result$12 == null; 
    boolean isNull$17 = isNull$14 || false;
    boolean result$16;
    if (isNull$17) {
      result$16 = false;
    }
    else {
      result$16 = result$13 != result$15;
    }
    
    if (result$16) { // 这里说明 myFrequency <> 0,所以可以进入
	    java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
	      isNull$8 ? null : (java.lang.Long) result$7); // 这里对应的是 SELECT myFrequency,注意的是,这里又调用了 UDF,重新计算了一遍,所以 UDF 才不应该有状态信息。 
	    boolean isNull$11 = result$9 == null;
	    long result$10;
	    if (isNull$11) {
	      result$10 = -1L;
	    }
	    else {
	      result$10 = result$9; // 这里才进行SELECT myFrequency,但是这时候 UDF 已经被计算两次了
	    }
    }

2. 完整版

以下是生成的代码,因为是自动生成,所以看起来会有点费劲,不过好在已经是最后一步了。

public class DataSetCalcRule$18 extends org.apache.flink.api.common.functions.RichFlatMapFunction {

  final mytestpackage.myUdf function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b;

  final org.apache.flink.types.Row out =
      new org.apache.flink.types.Row(2);
  
  private org.apache.flink.types.Row in1;

  public DataSetCalcRule$18() throws Exception {
    
    function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b = (mytestpackage.myUdf)
    org.apache.flink.table.utils.EncodingUtils.decodeStringToObject(
      "rO0ABXNyABFzcGVuZHJlcG9ydC5teVVkZmGYnDRF7Hj4AgABTAAHY3VycmVudHQAEExqYXZhL2xhbmcvTG9uZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb25uLPkGQbqbDAIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9u14hb_NiViUACAAB4cHNyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAAAAAAAA",org.apache.flink.table.functions.UserDefinedFunction.class); 
  }

  @Override
  public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
    function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
  }

  @Override
  public void flatMap(Object _in1,org.apache.flink.util.Collector c) throws Exception {
    in1 = (org.apache.flink.types.Row) _in1;
    
    boolean isNull$6 = (java.lang.String) in1.getField(0) == null;
    java.lang.String result$5;
    if (isNull$6) {
      result$5 = "";
    }
    else {
      result$5 = (java.lang.String) (java.lang.String) in1.getField(0);
    }
    
    boolean isNull$8 = (java.lang.Long) in1.getField(1) == null;
    long result$7;
    if (isNull$8) {
      result$7 = -1L;
    }
    else {
      result$7 = (java.lang.Long) in1.getField(1);
    }

    java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
      isNull$8 ? null : (java.lang.Long) result$7);

    boolean isNull$14 = result$12 == null;
    long result$13;
    if (isNull$14) {
      result$13 = -1L;
    }
    else {
      result$13 = result$12;
    }

    long result$15 = 0L;
    
    boolean isNull$17 = isNull$14 || false;
    boolean result$16;
    if (isNull$17) {
      result$16 = false;
    }
    else {
      result$16 = result$13 != result$15;
    }
    
    if (result$16) {
    
        if (isNull$6) {
          out.setField(0,null);
        }
        else {
          out.setField(0,result$5);
        }

        java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
          isNull$8 ? null : (java.lang.Long) result$7);

        boolean isNull$11 = result$9 == null;
        long result$10;
        if (isNull$11) {
          result$10 = -1L;
        }
        else {
          result$10 = result$9;
        }

        if (isNull$11) {
          out.setField(1,null);
        }
        else {
          out.setField(1,result$10);
        }

          c.collect(out);
        }
  }

  @Override
  public void close() throws Exception {  
    function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.close();
  }
}

0x06 总结

至此,我们把Flink SQL如何生成JAVA代码的流程大致走了一遍。

Flink生成的内部代码,是把"投影运算"和"过滤条件"分别生成,然后拼接在一起

即使原始SQL中只有一次UDF调用,但是如果SELECT和WHERE都间接用到了UDF,那么最终"投影运算"和"过滤条件"就会分别调用了UDF,所以拼接之后就会有多个UDF调用。

这就是 "UDF不应该有内部历史状态" 的最终原因。我们在实际开发过程中一定要注意这个问题。

0x07 参考

UDX概述 https://help.aliyun.com/document_detail/69463.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。