[源码分析] 带你梳理 Flink SQL / Table API内部执行流程

[源码分析] 带你梳理 Flink SQL / Table API内部执行流程

0x00 摘要

本文将简述Flink SQL / Table API的内部实现,为大家把 "从SQL语句到具体执行" 这个流程串起来。并且尽量多提供调用栈,这样大家在遇到问题时就知道应该从什么地方设置断点,对整体架构理解也能更加深入。

SQL流程中涉及到几个重要的节点举例如下:

// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode   
*        |      
*        |     
*    FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules      
*    VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan 
*        | 
*        |   
*        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,逻辑执行计划
*        |  
*        |    
*    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
*    VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
*        |       
*        |   
*        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
*        |      
*        |     
*    StreamExecJoin.translateToPlanInternal  // 作用是生成 StreamOperator,即Flink算子  
*        |     
*        |     
*        +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask   
*        |     
*        |       
*    StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask调用StreamingJoinOperator,真实的执行 
*        |
*        |  

后续我们会以这个图为脉络进行讲解

0x01 Apache Calcite

Flink Table API&SQL 为流式数据和静态数据的关系查询保留统一的接口,而且利用了Apache Calcite的查询优化框架和SQL parser。

为什么Flink要使用Table API呢?总结来说,关系型API的好处如下:

  • 关系型API是声明式的
  • 查询能够被有效的优化
  • 查询可以高效的执行
  • “Everybody” knows SQL

Calcite是这里面的核心成员。Apache Calcite是面向Hadoop新的sql引擎,它提供了标准的SQL语言、多种查询优化和连接各种数据源的能力。

1. Calcite 概念

下面是 Calcite 概念梳理:

  • 关系代数(Relational algebra):即关系表达式。它们通常以动词命名,例如 Sort,Join,Project,Filter,Scan,Sample.
  • 表达式有各种特征(Trait):使用 Trait 的 satisfies() 方法来测试某个表达式是否符合某 Trait 或 Convention.
  • 规则(Rules):用于将一个表达式转换(Transform)为另一个表达式。它有一个由 RelOptRuleOperand 组成的列表来决定是否可将规则应用于树的某部分。
  • 规划器(Planner) :即请求优化器,它可以根据一系列规则和成本模型(例如基于成本的优化模型 VolcanoPlanner、启发式优化模型 HepPlanner)来将一个表达式转为语义等价(但效率更优)的另一个表达式。
  • RelNode :代表了对数据的一个处理操作,常见的操作有 Sort、Join、Project、Filter、Scan 等。它蕴含的是对整个 Relation 的操作,而不是对具体数据的处理逻辑。RelNode 会标识其 input RelNode 信息,这样就构成了一棵 RelNode 树。
  • RexNode : 行表达式(标量表达式),蕴含的是对一行数据的处理逻辑。每个行表达式都有数据的类型。这是因为在 Valdiation 的过程中,编译器会推导出表达式的结果类型。常见的行表达式包括字面量 RexLiteral, 变量 RexVariable,函数或操作符调用 RexCall 等。RexNode 通过 RexBuilder 进行构建。
  • RelTrait : 用来定义逻辑表的物理相关属性(physical property),三种主要的 trait 类型是:Convention、RelCollation、RelDistribution;

2. Calcite 处理流程

Sql 的执行过程一般可以分为四个阶段,Calcite 与这个很类似,但Calcite是分成五个阶段 :

  1. SQL 解析阶段,生成AST(抽象语法树)(SQL–>SqlNode)

  2. SqlNode 验证(SqlNode–>SqlNode)

  3. 语义分析,生成逻辑计划(Logical Plan)(SqlNode–>RelNode/RexNode)

  4. 优化阶段,按照相应的规则(Rule)进行优化(RelNode–>RelNode)

  5. 生成ExecutionPlan,生成物理执行计划(DataStream Plan)

1. Flink关系型API执行原理

Flink承载了 Table API 和 SQL API 两套表达方式。它以Apache Calcite这个SQL解析器做SQL语义解析,统一生成为 Calcite Logical Plan(SqlNode 树);随后验证;再利用 Calcite的优化器优化转换规则和logical plan,根据数据源的性质(流和批)使用不同的规则进行优化,优化为 RelNode 逻辑执行计划树;最终优化后的plan转成常规的Flink DataSet 或 DataStream 程序。任何对于DataStream API和DataSet API的性能调优提升都能够自动地提升Table API或者SQL查询的效率。

一条stream sql从提交到calcite解析、优化最后到Flink引擎执行,一般分为以下几个阶段:

  1. Sql Parser: 将sql语句通过java cc解析成AST(语法树),在calcite中用SqlNode表示AST;
  2. Sql Validator: 结合数字字典(catalog)去验证sql语法;
  3. 生成Logical Plan: 将sqlNode表示的AST转换成LogicalPlan,用relNode表示;
  4. 生成 optimized LogicalPlan: 先基于calcite rules 去优化logical Plan,再基于Flink定制的一些优化rules去优化logical Plan;
  5. 生成Flink PhysicalPlan: 这里也是基于Flink里头的rules,将optimized LogicalPlan转成成Flink的物理执行计划;
  6. 将物理执行计划转成Flink ExecutionPlan: 就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。

而如果是通过table api来提交任务的话,也会经过calcite优化等阶段,基本流程和直接运行sql类似:

  1. table api parser: Flink会把table api表达的计算逻辑也表示成一颗树,用treeNode去表式;
    在这棵树上的每个节点的计算逻辑用Expression来表示。
  2. Validate: 会结合数字字典(catalog)将树的每个节点的Unresolved Expression进行绑定,生成Resolved Expression;
  3. 生成Logical Plan: 依次遍历数的每个节点,调用construct方法将原先用treeNode表达的节点转成成用calcite 内部的数据结构relNode 来表达。即生成了LogicalPlan,
    再基于Flink定制的一些优化rules去优化logical Plan;
  4. 生成Flink PhysicalPlan: 这里也是基于Flink里头的rules,将optimized LogicalPlan转成成Flink的物理执行计划;
  5. 将物理执行计划转成Flink ExecutionPlan: 就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。

可以看出来,Table API 与 SQL 在获取 RelNode 之后是一样的流程,只是获取 RelNode 的方式有所区别:

  • Table API :通过使用 RelBuilder来拿到RelNode(LogicalNode与Expression分别转换成RelNode与RexNode);
  • SQL :通过使用Planner。首先通过parse方法将用户使用的SQL文本转换成由SqlNode表示的parse tree。接着通过validate方法,使用元信息来resolve字段,确定类型,验证有效性等等。最后通过rel方法将SqlNode转换成RelNode;

1. TableEnvironment对象

TableEnvironment对象是Table API和SQL集成的一个核心,支持以下场景:

  • 注册一个Table。
  • 将一个TableSource注册给TableEnvironment,这里的TableSource指的是将数据存储系统的作为Table,例如mysql,hbase,CSV,Kakfa,RabbitMQ等等。
  • 注册一个外部的catalog,可以访问外部系统的数据或文件。
  • 执行SQL查询。
  • 注册一个用户自定义的function。
  • 将DataStream或DataSet转成Table。

一个查询中只能绑定一个指定的TableEnvironment,TableEnvironment可以通过来配置TableConfig来配置,通过TableConfig可以自定义查询优化以及translation的进程。

TableEnvironment执行过程如下:

  • TableEnvironment.sql()为调用入口;

  • Flink实现了FlinkPlannerImpl,执行parse(sql),validate(sqlNode),rel(sqlNode)操作;

  • 生成Table;

具体代码摘要如下

package org.apache.Flink.table.api.internal;

@Internal
public class TableEnvironmentImpl implements TableEnvironment {
	private final CatalogManager catalogManager;
	private final ModuleManager moduleManager;
	private final OperationTreeBuilder operationTreeBuilder;
	private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();

	protected final TableConfig tableConfig;
	protected final Executor execEnv;
	protected final FunctionCatalog functionCatalog;
	protected final Planner planner;
	protected final Parser parser;  
}  

// 在程序中打印类内容如下

this = {StreamTableEnvironmentImpl@4701} 
 functionCatalog = {FunctionCatalog@4702} 
 scalaExecutionEnvironment = {StreamExecutionEnvironment@4703} 
 planner = {StreamPlanner@4704} 
  config = {TableConfig@4708} 
  executor = {StreamExecutor@4709} 
  PlannerBase.config = {TableConfig@4708} 
  functionCatalog = {FunctionCatalog@4702} 
  catalogManager = {CatalogManager@1250} 
  isStreamingMode = true
  plannerContext = {PlannerContext@4711} 
  parser = {ParserImpl@4696} 
 catalogManager = {CatalogManager@1250} 
 moduleManager = {ModuleManager@4705} 
 operationTreeBuilder = {OperationTreeBuilder@4706} 
 bufferedModifyOperations = {ArrayList@4707}  size = 0
 tableConfig = {TableConfig@4708} 
 execEnv = {StreamExecutor@4709} 
 TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4702} 
 TableEnvironmentImpl.planner = {StreamPlanner@4704} 
 parser = {ParserImpl@4696} 
 registration = {TableEnvironmentImpl$1@4710} 

2. Catalog

Catalog – 定义元数据和命名空间,包含 Schema(库),Table(表),RelDataType(类型信息)。

所有对数据库和表的元数据信息都存放在Flink CataLog内部目录结构中,其存放了Flink内部所有与Table相关的元数据信息,包括表结构信息/数据源信息等。

// TableEnvironment里面包含一个CatalogManager
public final class CatalogManager {
	// A map between names and catalogs.
	private Map<String,Catalog> catalogs;  
} 

// Catalog接口
public interface Catalog {
  ......
  	default Optional<TableFactory> getTableFactory() {
		return Optional.empty();
	}
  ......
}   

// 当数据来源是在程序里面自定义的时候,对应是GenericInMemoryCatalog
public class GenericInMemoryCatalog extends AbstractCatalog {
	public static final String DEFAULT_DB = "default";
	private final Map<String,CatalogDatabase> databases;
	private final Map<ObjectPath,CatalogBaseTable> tables;
	private final Map<ObjectPath,CatalogFunction> functions;
	private final Map<ObjectPath,Map<CatalogPartitionSpec,CatalogPartition>> partitions;

	private final Map<ObjectPath,CatalogTableStatistics> tableStats;
	private final Map<ObjectPath,CatalogColumnStatistics> tableColumnStats;
	private final Map<ObjectPath,CatalogTableStatistics>> partitionStats;
	private final Map<ObjectPath,CatalogColumnStatistics>> partitionColumnStats;
}  

// 程序中调试的内容  

catalogManager = {CatalogManager@4646} 
 catalogs = {LinkedHashMap@4652}  size = 1
  "default_catalog" -> {GenericInMemoryCatalog@4659} 
   key = "default_catalog"
    value = {char[15]@4668} 
    hash = 552406043
   value = {GenericInMemoryCatalog@4659} 
    databases = {LinkedHashMap@4660}  size = 1
    tables = {LinkedHashMap@4661}  size = 0
    functions = {LinkedHashMap@4662}  size = 0
    partitions = {LinkedHashMap@4663}  size = 0
    tableStats = {LinkedHashMap@4664}  size = 0
    tableColumnStats = {LinkedHashMap@4665}  size = 0
    partitionStats = {LinkedHashMap@4666}  size = 0
    partitionColumnStats = {LinkedHashMap@4667}  size = 0
    catalogName = "default_catalog"
    defaultDatabase = "default_database"
 temporaryTables = {HashMap@4653}  size = 2
 currentCatalogName = "default_catalog"
 currentDatabaseName = "default_database"
 builtInCatalogName = "default_catalog"

3. StreamPlanner

StreamPlanner是新的Blink Planner一种。

Flink Table 的新架构实现了查询处理器的插件化,社区完整保留原有 Flink Planner (Old Planner),同时又引入了新的 Blink Planner,用户可以自行选择使用 Old Planner 还是 Blink Planner。

在模型上,Old Planner 没有考虑流计算作业和批处理作业的统一,针对流计算作业和批处理作业的实现不尽相同,在底层会分别翻译到 DataStream API 和 DataSet API 上。而 Blink Planner 将批数据集看作 bounded DataStream (有界流式数据) ,流计算作业和批处理作业最终都会翻译到 Transformation API 上。 在架构上,Blink Planner 针对批处理和流计算,分别实现了BatchPlanner 和 StreamPlanner ,两者共用了大部分代码,共享了很多优化逻辑。 Old Planner 针对批处理和流计算的代码实现的是完全独立的两套体系,基本没有实现代码和优化逻辑复用。

除了模型和架构上的优点外,Blink Planner 沉淀了许多实用功能,集中在三个方面:

  • Blink Planner 对代码生成机制做了改进、对部分算子进行了优化,提供了丰富实用的新功能,如维表 join、Top N、MiniBatch、流式去重、聚合场景的数据倾斜优化等新功能。
  • Blink Planner 的优化策略是基于公共子图的优化算法,包含了基于成本的优化(CBO)和基于规则的优化(CRO)两种策略,优化更为全面。同时,Blink Planner 支持从 catalog 中获取数据源的统计信息,这对CBO优化非常重要。
  • Blink Planner 提供了更多的内置函数,更标准的 SQL 支持,在 Flink 1.9 版本中已经完整支持 TPC-H ,对高阶的 TPC-DS 支持也计划在下一个版本实现。

具体对应代码来看,StreamPlanner体现在translateToPlan会调用到不同的 StreamOperator 生成系统上。

class StreamPlanner(
    executor: Executor,config: TableConfig,functionCatalog: FunctionCatalog,catalogManager: CatalogManager)
  extends PlannerBase(executor,config,functionCatalog,catalogManager,isStreamingMode = true) {

  override protected def translateToPlan(
      execNodes: util.List[ExecNode[_,_]]): util.List[Transformation[_]] = {
    execNodes.map {
      case node: StreamExecNode[_] => node.translateToPlan(this)
      case _ =>
        throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
          "This is a bug and should not happen. Please file an issue.")
    }
  }
}

@Internal
public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl implements StreamTableEnvironment {

	private <T> DataStream<T> toDataStream(Table table,OutputConversionModifyOperation modifyOperation) {
    
    // 在转换回DataStream时候进行调用 planner 生成plan的操作。
    
		List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));

		Transformation<T> transformation = getTransformation(table,transformations);

		executionEnvironment.addOperator(transformation);
		return new DataStream<>(executionEnvironment,transformation);
	}
}

// 程序中调试打印的运行栈 

translateToPlanInternal:85,StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:39,StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58,ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
translateToPlan:39,StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToTransformation:184,StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:153,StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:48,StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58,ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
translateToPlan:48,StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
apply:60,StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
apply:59,StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
apply:234,TraversableLike$$anonfun$map$1 (scala.collection)
apply:234,TraversableLike$$anonfun$map$1 (scala.collection)
foreach:891,Iterator$class (scala.collection)
foreach:1334,AbstractIterator (scala.collection)
foreach:72,IterableLike$class (scala.collection)
foreach:54,AbstractIterable (scala.collection)
map:234,TraversableLike$class (scala.collection)
map:104,AbstractTraversable (scala.collection)
translateToPlan:59,StreamPlanner (org.apache.Flink.table.planner.delegation)
translate:153,PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210,StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107,StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101,TableConversions (org.apache.Flink.table.api.scala)
main:89,StreamSQLExample$ (spendreport)
main:-1,StreamSQLExample (spendreport)

4. FlinkPlannerImpl

Flink实现了FlinkPlannerImpl,做为和Calcite 联系的桥梁,执行parse(sql),validate(sqlNode),rel(sqlNode)操作。

class FlinkPlannerImpl(
    config: FrameworkConfig,catalogReaderSupplier: JFunction[JBoolean,CalciteCatalogReader],typeFactory: FlinkTypeFactory,cluster: RelOptCluster) {

  val operatorTable: SqlOperatorTable = config.getOperatorTable
  val parser: CalciteParser = new CalciteParser(config.getParserConfig)
  val convertletTable: SqlRexConvertletTable = config.getConvertletTable
  val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig
}

// 这里会有使用 FlinkPlannerImpl
public class ParserImpl implements Parser {
	private final CatalogManager catalogManager;
	private final Supplier<FlinkPlannerImpl> validatorSupplier;
	private final Supplier<CalciteParser> calciteParserSupplier;
  
	@Override
	public List<Operation> parse(String statement) {
		CalciteParser parser = calciteParserSupplier.get();
    // 这里会有使用 FlinkPlannerImpl
		FlinkPlannerImpl planner = validatorSupplier.get();
		// parse the sql query
		SqlNode parsed = parser.parse(statement);
		Operation operation = SqlToOperationConverter.convert(planner,parsed)
			.orElseThrow(() -> new TableException("Unsupported query: " + statement));
		return Collections.singletonList(operation);
	}  
}

// 程序中调试的内容  

planner = {FlinkPlannerImpl@4659} 
 config = {Frameworks$StdFrameworkConfig@4685} 
 catalogReaderSupplier = {PlannerContext$lambda@4686} 
 typeFactory = {FlinkTypeFactory@4687} 
 cluster = {FlinkRelOptCluster@4688} 
 operatorTable = {ChainedSqlOperatorTable@4689} 
 parser = {CalciteParser@4690} 
 convertletTable = {StandardConvertletTable@4691} 
 sqlToRelConverterConfig = {SqlToRelConverter$ConfigImpl@4692} 
 validator = null
   
// 程序调用栈之一   
   
validate:104,FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
convert:127,SqlToOperationConverter (org.apache.Flink.table.planner.operations)
parse:66,ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464,TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82,StreamSQLExample (spendreport)
  
// 程序调用栈之二       
  
rel:135,FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
toQueryOperation:522,SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convertSqlQuery:436,SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convert:154,StreamSQLExample (spendreport)  

5. Table 和 TableImpl

从代码中能看出,这就是个把各种相关操作和信息封装起来类而已,并不涉及太多实际逻辑。

@Internal
public class TableImpl implements Table {

	private static final AtomicInteger uniqueId = new AtomicInteger(0);

	private final TableEnvironment tableEnvironment;
	private final QueryOperation operationTree;
	private final OperationTreeBuilder operationTreeBuilder;
	private final LookupCallResolver lookupResolver;
  
	private TableImpl joinInternal(
			Table right,Optional<Expression> joinPredicate,JoinType joinType) {
		verifyTableCompatible(right);

		return createTable(operationTreeBuilder.join(
			this.operationTree,right.getQueryOperation(),joinType,joinPredicate,false));
	}
}

// 程序中调试的内容 

view = {TableImpl@4583} "UnnamedTable$0"
 tableEnvironment = {StreamTableEnvironmentImpl@4580} 
  functionCatalog = {FunctionCatalog@4646} 
  scalaExecutionEnvironment = {StreamExecutionEnvironment@4579} 
  planner = {StreamPlanner@4647} 
  catalogManager = {CatalogManager@4644} 
  moduleManager = {ModuleManager@4648} 
  operationTreeBuilder = {OperationTreeBuilder@4649} 
  bufferedModifyOperations = {ArrayList@4650}  size = 0
  tableConfig = {TableConfig@4651} 
  execEnv = {StreamExecutor@4652} 
  TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4646} 
  TableEnvironmentImpl.planner = {StreamPlanner@4647} 
  parser = {ParserImpl@4653} 
  registration = {TableEnvironmentImpl$1@4654} 
 operationTree = {ScalaDataStreamQueryOperation@4665} 
  identifier = null
  dataStream = {DataStreamSource@4676} 
  fieldIndices = {int[2]@4677} 
  tableSchema = {TableSchema@4678} "root\n |-- orderId: STRING\n |-- productName: STRING\n"
 operationTreeBuilder = {OperationTreeBuilder@4649} 
  config = {TableConfig@4651} 
  functionCatalog = {FunctionCatalog@4646} 
  tableReferenceLookup = {TableEnvironmentImpl$lambda@4668} 
  lookupResolver = {LookupCallResolver@4669} 
  projectionOperationFactory = {ProjectionOperationFactory@4670} 
  sortOperationFactory = {SortOperationFactory@4671} 
  calculatedTableFactory = {CalculatedTableFactory@4672} 
  setOperationFactory = {SetOperationFactory@4673} 
  aggregateOperationFactory = {AggregateOperationFactory@4674} 
  joinOperationFactory = {JoinOperationFactory@4675} 
 lookupResolver = {LookupCallResolver@4666} 
  functionLookup = {FunctionCatalog@4646} 
 tableName = "UnnamedTable$0"
  value = {char[14]@4667} 
  hash = 1355882650

1. SQL 解析阶段(SQL–>SqlNode)

这里对应前面脉络图,作用是生成了 SqlJoin 这样的 SqlNode

// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode      
*        |   
*        |  
*        +-----> SqlJoin (SqlNode)
*        |   
*        |   

Calcite 使用 JavaCC 做 SQL 解析,JavaCC 根据 Calcite 中定义的 Parser.jj 文件,生成一系列的 java 代码,生成的 Java 代码会把 SQL 转换成 AST 的数据结构(这里是 SqlNode 类型)。

即:把 SQL 转换成为 AST (抽象语法树),在 Calcite 中用 SqlNode 来表示;

package org.apache.Flink.table.planner.delegation;

public class ParserImpl implements Parser {
	@Override
	public List<Operation> parse(String statement) {
		CalciteParser parser = calciteParserSupplier.get();
		FlinkPlannerImpl planner = validatorSupplier.get();
    
		// parse the sql query
		SqlNode parsed = parser.parse(statement);

		Operation operation = SqlToOperationConverter.convert(planner,parsed)
			.orElseThrow(() -> new TableException("Unsupported query: " + statement));
		return Collections.singletonList(operation);
	}  
}  

// 打印出来解析之后 parsed 的内容,我们能看到 SqlNode 的基本格式。
  
parsed = {SqlBasicCall@4690} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2\nUNION ALL\nSELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
 operator = {SqlSetOperator@4716} "UNION ALL"
  all = true
  name = "UNION ALL"
  kind = {SqlKind@4742} "UNION"
  leftPrec = 14
  rightPrec = 15
  returnTypeInference = {ReturnTypes$lambda@4743} 
  operandTypeInference = null
  operandTypeChecker = {SetopOperandTypeChecker@4744} 
 operands = {SqlNode[2]@4717} 
  0 = {SqlSelect@4746} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2"
  1 = {SqlSelect@4747} "SELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
 functionQuantifier = null
 expanded = false
 pos = {SqlParserPos@4719} "line 2,column 1"  
    
// 下面是调试相关Stack,可以帮助大家深入理解
    
SqlStmt:3208,FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
SqlStmtEof:3732,FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
parseSqlStmtEof:234,FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
parseQuery:160,SqlParser (org.apache.calcite.sql.parser)
parseStmt:187,SqlParser (org.apache.calcite.sql.parser)
parse:48,CalciteParser (org.apache.Flink.table.planner.calcite)
parse:64,StreamSQLExample (spendreport)
  
// 另一个参考 in FlinkSqlParserImpl.FromClause     
    
e = {SqlJoin@4709} "`Orders` AS `o`\nLEFT JOIN `Payment` AS `p` ON `o`.`orderId` = `p`.`orderId`"
 left = {SqlBasicCall@4676} "`Orders` AS `o`"
  operator = {SqlAsOperator@4752} "AS"
  operands = {SqlNode[2]@4753} 
  functionQuantifier = null
  expanded = false
  pos = {SqlParserPos@4755} "line 7,column 3"
 natural = {SqlLiteral@4677} "FALSE"
  typeName = {SqlTypeName@4775} "BOOLEAN"
  value = {Boolean@4776} false
  pos = {SqlParserPos@4777} "line 7,column 13"
 joinType = {SqlLiteral@4678} "LEFT"
  typeName = {SqlTypeName@4758} "SYMBOL"
  value = {JoinType@4759} "LEFT"
  pos = {SqlParserPos@4724} "line 7,column 26"
 right = {SqlBasicCall@4679} "`Payment` AS `p`"
  operator = {SqlAsOperator@4752} "AS"
  operands = {SqlNode[2]@4763} 
  functionQuantifier = null
  expanded = false
  pos = {SqlParserPos@4764} "line 7,column 31"
 conditionType = {SqlLiteral@4680} "ON"
  typeName = {SqlTypeName@4758} "SYMBOL"
  value = {JoinConditionType@4771} "ON"
  pos = {SqlParserPos@4772} "line 7,column 44"
 condition = {SqlBasicCall@4681} "`o`.`orderId` = `p`.`orderId`"
  operator = {SqlBinaryOperator@4766} "="
  operands = {SqlNode[2]@4767} 
  functionQuantifier = null
  expanded = false
  pos = {SqlParserPos@4768} "line 7,column 47"
 pos = {SqlParserPos@4724} "line 7,column 26" 
        
// 下面是调试相关Stack,可以帮助大家深入理解
    
FromClause:10192,FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
SqlSelect:5918,FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
LeafQuery:630,FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
LeafQueryOrExpr:15651,FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
QueryOrExpr:15118,FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
OrderedQueryOrExpr:504,FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
SqlStmt:3693,FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
SqlStmtEof:3732,FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
parseSqlStmtEof:234,FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
parseQuery:160,CalciteParser (org.apache.flink.table.planner.calcite)
parse:64,ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:464,TableEnvironmentImpl (org.apache.flink.table.api.internal)
main:73,SimpleOuterJoin$ (spendreport)
main:-1,SimpleOuterJoin (spendreport)  

2. SqlNode 验证(SqlNode–>SqlNode)

经过上面的第一步,会生成一个 SqlNode 对象,它是一个未经验证的抽象语法树,下面就进入了一个语法检查阶段,语法检查前需要知道元数据信息,这个检查会包括表名、字段名、函数名、数据类型的检查。

即:语法检查,根据元数据信息进行语法验证,验证之后还是用 SqlNode 表示 AST 语法树;

package org.apache.Flink.table.planner.operations;

public class SqlToOperationConverter {
	public static Optional<Operation> convert(
    // 这里进行validate的调用
		final SqlNode validated = FlinkPlanner.validate(sqlNode);
		SqlToOperationConverter converter = new SqlToOperationConverter(FlinkPlanner,catalogManager);
	}    
}
    
// 打印出来解析之后 validated 的内容。    
    
validated = {SqlBasicCall@4675} "SELECT `UnnamedTable$0`.`user`,`UnnamedTable$0`.`product`,`UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2\nUNION ALL\nSELECT `OrderB`.`user`,`OrderB`.`product`,`OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
 operator = {SqlSetOperator@5000} "UNION ALL"
  all = true
  name = "UNION ALL"
  kind = {SqlKind@5029} "UNION"
  leftPrec = 14
  rightPrec = 15
  returnTypeInference = {ReturnTypes$lambda@5030} 
  operandTypeInference = null
  operandTypeChecker = {SetopOperandTypeChecker@5031} 
 operands = {SqlNode[2]@5001} 
  0 = {SqlSelect@4840} "SELECT `UnnamedTable$0`.`user`,`UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2"
  1 = {SqlSelect@5026} "SELECT `OrderB`.`user`,`OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
 functionQuantifier = null
 expanded = false
 pos = {SqlParserPos@5003} "line 2,column 1"    
    
// 下面是调试相关Stack,可以帮助大家深入理解    
    
validate:81,AbstractNamespace (org.apache.calcite.sql.validate)
validateNamespace:1008,SqlValidatorImpl (org.apache.calcite.sql.validate)
validateQuery:968,SqlValidatorImpl (org.apache.calcite.sql.validate)
validateCall:90,SqlSetOperator (org.apache.calcite.sql)
validateCall:5304,SqlValidatorImpl (org.apache.calcite.sql.validate)
validate:116,SqlCall (org.apache.calcite.sql)
validateScopedExpression:943,SqlValidatorImpl (org.apache.calcite.sql.validate)
validate:650,SqlValidatorImpl (org.apache.calcite.sql.validate)
org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$validate:126,FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
validate:105,StreamSQLExample (spendreport)    

3. 语义分析(SqlNode–>RelNode/RexNode)

脉络图中,这时候来到了

// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode   
*        |      
*        |     

经过第二步之后,这里的 SqlNode 就是经过语法校验的 SqlNode 树,接下来这一步就是将 SqlNode 转换成 RelNode/RexNode,也就是生成相应的逻辑计划(Logical Plan)

即:语义分析,根据 SqlNode及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan);

根据这个已经生成的Flink的logical Plan,将它转换成calcite的logicalPlan,这样我们才能用到calcite强大的优化规则

Flink由上往下依次调用各个节点的construct方法,将Flink节点转换成calcite的RelNode节点。真正的实现是在 convertQueryRecursive() 方法中完成的。

比如生成 LogicalProject 调用关系大概如下:

createJoin:378,RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core)
createJoin:2520,SqlToRelConverter (org.apache.calcite.sql2rel)
convertFrom:2111,SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:646,SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:627,SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3181,SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:563,SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148,FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:135,FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:522,SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlQuery:436,SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:154,SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:66,SimpleOuterJoin (spendreport)

具体详细源码如下:

SqlToRelConverter 中的 convertQuery() 将 SqlNode 转换为 RelRoot
  
public class SqlToRelConverter {  
    public RelRoot convertQuery(SqlNode query,boolean needsValidation,boolean top) {
        if (needsValidation) {
            query = this.validator.validate(query);
        }
              	             RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(this.cluster.getMetadataProvider()));
        RelNode result = this.convertQueryRecursive(query,top,(RelDataType)null).rel;
        if (top && isStream(query)) {
            result = new LogicalDelta(this.cluster,((RelNode)result).getTraitSet(),(RelNode)result);
        }

        RelCollation collation = RelCollations.EMPTY;
        if (!query.isA(SqlKind.DML) && isOrdered(query)) {
            collation = this.requiredCollation((RelNode)result);
        }

        this.checkConvertedType(query,(RelNode)result);

        RelDataType validatedRowType = this.validator.getValidatedNodeType(query);
      
        // 这里设定了Root
        return RelRoot.of((RelNode)result,validatedRowType,query.getKind()).withCollation(collation);
    }
}

// 在这里打印
toQueryOperation:523,SqlToOperationConverter (org.apache.Flink.table.planner.operations)

// 得到如下内容,可以看到一个RelRoot的真实结构
  
relational = {RelRoot@5248} "Root {kind: UNION,rel: LogicalUnion#6,rowType: RecordType(BIGINT user,VARCHAR(2147483647) product,INTEGER amount),fields: [<0,user>,<1,product>,<2,amount>],collation: []}"
 rel = {LogicalUnion@5227} "LogicalUnion#6"
  inputs = {RegularImmutableList@5272}  size = 2
  kind = {SqlKind@5029} "UNION"
  all = true
  desc = "LogicalUnion#6"
  rowType = {RelRecordType@5238} "RecordType(BIGINT user,INTEGER amount)"
  digest = "LogicalUnion#6"
  cluster = {FlinkRelOptCluster@4800} 
  id = 6
  traitSet = {RelTraitSet@5273}  size = 5
 validatedRowType = {RelRecordType@5238} "RecordType(BIGINT user,INTEGER amount)"
  kind = {StructKind@5268} "FULLY_QUALIFIED"
  nullable = false
  fieldList = {RegularImmutableList@5269}  size = 3
  digest = "RecordType(BIGINT user,VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product,INTEGER amount) NOT NULL"
 kind = {SqlKind@5029} "UNION"
  lowerName = "union"
  sql = "UNION"
  name = "UNION"
  ordinal = 18
 fields = {RegularImmutableList@5254}  size = 3
  {Integer@5261} 0 -> "user"
  {Integer@5263} 1 -> "product"
  {Integer@5265} 2 -> "amount"
 collation = {RelCollationImpl@5237} "[]"
  fieldCollations = {RegularImmutableList@5256}  size = 0

// 调用栈内容
    
convertQuery:561,SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$rel:148,FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
rel:135,StreamSQLExample (spendreport)
  
// 再次举例,生成了LogicalProject 
  
bb = {SqlToRelConverter$Blackboard@4978} 
 scope = {SelectScope@4977} 
 nameToNodeMap = null
 root = {LogicalProject@5100} "LogicalProject#4"
  exps = {RegularImmutableList@5105}  size = 3
  input = {LogicalJoin@5106} "LogicalJoin#3"
  desc = "LogicalProject#4"
  rowType = {RelRecordType@5107} "RecordType(VARCHAR(2147483647) orderId,VARCHAR(2147483647) productName,VARCHAR(2147483647) payType)"
  digest = "LogicalProject#4"
  cluster = {FlinkRelOptCluster@4949} 
  id = 4
  traitSet = {RelTraitSet@5108}  size = 5
 inputs = {Collections$SingletonList@5111}  size = 1
 mapCorrelateToRex = {HashMap@5112}  size = 0
 isPatternVarRef = false
 cursors = {ArrayList@5113}  size = 0
 subQueryList = {LinkedHashSet@5114}  size = 0
 agg = null
 window = null
 mapRootRelToFieldProjection = {HashMap@5115}  size = 0
 columnMonotonicities = {ArrayList@5116}  size = 3
 systemFieldList = {ArrayList@5117}  size = 0
 top = true
 initializerExpressionFactory = {NullInitializerExpressionFactory@5118} 
 this$0 = {SqlToRelConverter@4926} 

// 举例,LogicalProject是在这里生成的。

   protected void convertFrom(SqlToRelConverter.Blackboard bb,SqlNode from) {
            case JOIN:
                RelNode joinRel = this.createJoin(fromBlackboard,leftRel,rightRel,conditionExp,convertedJoinType);
                bb.setRoot(joinRel,false);  
   }
  
// 相关调用栈

createJoin:378,SimpleOuterJoin (spendreport)

4. 优化阶段(RelNode–>RelNode)

这时候,脉络图到了这里

// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode   
*        |      
*        |     
*    FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules      
*    VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan 
*        | 
*        |   
*        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,逻辑执行计划
*        |  
*        |    
*    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
*    VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
*        |       
*        |   
*        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
*        |      
*        |     

第四阶段,也就是 Calcite 的核心所在。

即:逻辑计划优化,优化器的核心,根据前面生成的逻辑计划按照相应的规则(Rule)进行优化;

Flink的这部分实现统一封装在optimize方法里头。这部分涉及到多个阶段,每个阶段都是用Rule对逻辑计划进行优化和改进。

优化器的作用

在 Calcite 架构中,最核心地方就是 Optimizer,也就是优化器,一个 Optimization Engine 包含三个组成部分:

  • rules:也就是匹配规则,Calcite 内置上百种 Rules 来优化 relational expression,当然也支持自定义 rules;
  • metadata providers:主要是向优化器提供信息,这些信息会有助于指导优化器向着目标(减少整体 cost)进行优化,信息可以包括行数、table 哪一列是唯一列等,也包括计算 RelNode 树中执行 subexpression cost 的函数;
  • planner engines:它的主要目标是进行触发 rules 来达到指定目标,比如像 cost-based optimizer(CBO)的目标是减少cost(Cost 包括处理的数据行数、CPU cost、IO cost 等)。

优化器的作用是将解析器生成的关系代数表达式转换成执行计划,供执行引擎执行,在这个过程中,会应用一些规则优化,以帮助生成更高效的执行计划。优化器进行优化的地方如过滤条件的下压(push down),在进行 join 操作前,先进行 filter 操作,这样的话就不需要在 join 时进行全量 join,减少参与 join 的数据量等。

Calcite 中 RelOptPlanner 是 Calcite 中优化器的基类。Calcite 中关于优化器提供了两种实现:

  • HepPlanner:就是基于规则优化RBO 的实现,它是一个启发式的优化器,按照规则进行匹配,直到达到次数限制(match 次数限制)或者遍历一遍后不再出现 rule match 的情况才算完成;
  • VolcanoPlanner:就是基于成本优化CBO 的实现,它会一直迭代 rules,直到找到 cost 最小的 paln。

基于成本优化(CBO)

基于代价的优化器(Cost-Based Optimizer,CBO) 是根据优化规则对关系表达式进行转换。这里的转换是说一个关系表达式经过优化规则后会生成另外一个关系表达式,同时原有表达式也会保留,经过一系列转换后会生成多个执行计划,然后 CBO 会根据统计信息和代价模型 (Cost Model) 计算每个执行计划的 Cost,从中挑选 Cost 最小的执行计划。

由上可知,CBO 中有两个依赖:统计信息和代价模型。统计信息的准确与否、代价模型的合理与否都会影响 CBO 选择最优计划。 从上述描述可知,CBO 是优于 RBO 的,原因是 RBO 是一种只认规则,对数据不敏感的呆板的优化器,而在实际过程中,数据往往是有变化的,通过 RBO 生成的执行计划很有可能不是最优的。事实上目前各大数据库和大数据计算引擎都倾向于使用 CBO,但是对于流式计算引擎来说,使用 CBO 还是有很大难度的,因为并不能提前预知数据量等信息,这会极大地影响优化效果,CBO 主要还是应用在离线的场景。

VolcanoPlanner相关概念

VolcanoPlanner就是 CBO 的实现,它会一直迭代 rules,直到找到 cost 最小的 paln。其部分相关概念如下:

  • RelSet 描述一组等价 Relation Expression,所有的 RelNode 会记录在 rels 中;
  • RelSubset 描述一组物理属性相同的等价 Relation Expression,即它们具有相同的 Physical Properties;每个 RelSubset 都会记录其所属的 RelSet;RelSubset 继承自 AbstractRelNode,它也是一种 RelNode,物理属性记录在其成员变量 traitSet 中。每个 RelSubset 都将会记录其最佳 plan(best)和最佳 plan 的 cost(bestCost)信息。
  • RuleMatch 是对 Rule 和 RelSubset 关系的一个抽象,它会记录这两者的信息。
  • importance 决定了在进行 Rule 优化时 Rule 应用的顺序,它是一个相对概念,在 VolcanoPlanner 中有两个 importance,分别是 RelSubset 和 RuleMatch 的 importance

VolcanoPlanner执行步骤

在应用 VolcanoPlanner 时,整体分为以下四步:

  1. 初始化 VolcanoPlanner,并向 Rule Match Queue 中添加相应的 Rule Match(包括 ConverterRule);
  2. 对 RelNode 做等价转换:应用 Rule Match 对 plan graph 做 transformation 优化(Rule specifies an Operator sub-graph to match and logic to generate equivalent better sub-graph);这里只是改变其物理属性(Convention);
  3. 通过 VolcanoPlanner 的 setRoot() 方法注册相应的 RelNode,并进行相应的初始化操作;
  4. 通过动态规划算法进行相应的迭代,直到 cost 不再变化或者 Rule Match Queue 中 rule match 已经全部应用完成;这样找到 cost 最小的 plan;Rule Match 的 importance 依赖于 RelNode 的 cost 和深度。

下面通过这个 示例 来详细看下 VolcanoPlanner 内部的实现逻辑。

//1. 初始化 VolcanoPlanner 对象,并添加相应的 Rule
VolcanoPlanner planner = new VolcanoPlanner();
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
planner.addRelTraitDef(RelDistributionTraitDef.INSTANCE);
// 添加相应的 rule
  planner.addRule(FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN);
planner.addRule(ReduceExpressionsRule.PROJECT_INSTANCE);
planner.addRule(PruneEmptyRules.PROJECT_INSTANCE);
// 添加相应的 ConverterRule
planner.addRule(EnumerableRules.ENUMERABLE_MERGE_JOIN_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_PROJECT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
//2. Changes a relational expression to an equivalent one with a different set of traits.
RelTraitSet desiredTraits =
    relNode.getCluster().traitSet().replace(EnumerableConvention.INSTANCE);
relNode = planner.changeTraits(relNode,desiredTraits);
//3. 通过 VolcanoPlanner 的 setRoot 方法注册相应的 RelNode,并进行相应的初始化操作
planner.setRoot(relNode);
//4. 通过动态规划算法找到 cost 最小的 plan
relNode = planner.findBestExp();

Flink 中相关代码如下:

public PlannerContext(
			TableConfig tableConfig,FunctionCatalog functionCatalog,CatalogManager catalogManager,CalciteSchema rootSchema,List<RelTraitDef> traitDefs) {
		this.tableConfig = tableConfig;

		this.context = new FlinkContextImpl(
				tableConfig,this::createSqlExprToRexConverter);

		this.rootSchema = rootSchema;
		this.traitDefs = traitDefs;
		// Make a framework config to initialize the RelOptCluster instance,// caution that we can only use the attributes that can not be overwrite/configured
		// by user.
		this.frameworkConfig = createFrameworkConfig();

    // 这里使用了VolcanoPlanner
		RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(),frameworkConfig.getContext());
		planner.setExecutor(frameworkConfig.getExecutor());
		for (RelTraitDef traitDef : frameworkConfig.getTraitDefs()) {
			planner.addRelTraitDef(traitDef);
		}
		this.cluster = FlinkRelOptClusterFactory.create(planner,new RexBuilder(typeFactory));
	}


//初始化
<init>:119,PlannerContext (org.apache.Flink.table.planner.delegation)
<init>:86,PlannerBase (org.apache.Flink.table.planner.delegation)
<init>:44,StreamPlanner (org.apache.Flink.table.planner.delegation)
create:50,BlinkPlannerFactory (org.apache.Flink.table.planner.delegation)
create:325,StreamTableEnvironmentImpl$ (org.apache.Flink.table.api.scala.internal)
create:425,StreamTableEnvironment$ (org.apache.Flink.table.api.scala)
main:56,StreamSQLExample (spendreport)


class FlinkVolcanoProgram[OC <: FlinkOptimizeContext] extends FlinkRuleSetProgram[OC] {

 override def optimize(root: RelNode,context: OC): RelNode = {
    val targetTraits = root.getTraitSet.plusAll(requiredOutputTraits.get).simplify()
    // VolcanoPlanner limits that the planer a RelNode tree belongs to and
    // the VolcanoPlanner used to optimize the RelNode tree should be same instance.
    // see: VolcanoPlanner#registerImpl
    // here,use the planner in cluster directly
      
    // 这里也使用了VolcanoPlanner 
    val planner = root.getCluster.getPlanner.asInstanceOf[VolcanoPlanner]
    val optProgram = Programs.ofRules(rules)
  }
}      
  
// 其调用栈

optimize:60,FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62,FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58,FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157,TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157,TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891,AbstractIterable (scala.collection)
foldLeft:157,TraversableOnce$class (scala.collection)
foldLeft:104,AbstractTraversable (scala.collection)
optimize:57,FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170,StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90,StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77,CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248,PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151,StreamSQLExample (spendreport)

// 下面全部是 VolcanoPlanner 相关代码和调用栈 
  
// VolcanoPlanner添加Rule,筛选出来的优化规则会封装成VolcanoRuleMatch,然后扔到RuleQueue里,而这个RuleQueue正是接下来执行动态规划算法要用到的核心类。   
public class VolcanoPlanner extends AbstractRelOptPlanner {
      public boolean addRule(RelOptRule rule) {
        ......
      }
}    
        
addRule:438,VolcanoPlanner (org.apache.calcite.plan.volcano)
run:315,Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64,StreamSQLExample (spendreport)        
 
// VolcanoPlanner修改Traits   
public class VolcanoPlanner extends AbstractRelOptPlanner {
    public RelNode changeTraits(RelNode rel,RelTraitSet toTraits) {
        assert !rel.getTraitSet().equals(toTraits);

        assert toTraits.allSimple();

        RelSubset rel2 = this.ensureRegistered(rel,(RelNode)null);
        return rel2.getTraitSet().equals(toTraits) ? rel2 : rel2.set.getOrCreateSubset(rel.getCluster(),toTraits.simplify());
    }
}

changeTraits:529,VolcanoPlanner (org.apache.calcite.plan.volcano)
run:324,StreamSQLExample (spendreport)
  
// VolcanoPlanner设定Root 
public class VolcanoPlanner extends AbstractRelOptPlanner {  
    public void setRoot(RelNode rel) {
        this.registerMetadataRels();
        this.root = this.registerImpl(rel,(RelSet)null);
        if (this.originalRoot == null) {
            this.originalRoot = rel;
        }

        this.ruleQueue.recompute(this.root);
        this.ensureRootConverters();
    }
}

setRoot:294,VolcanoPlanner (org.apache.calcite.plan.volcano)
run:326,StreamSQLExample (spendreport)
  
// VolcanoPlanner找到最小cost,本质上就是一个动态规划算法的实现。
  
public class VolcanoPlanner extends AbstractRelOptPlanner {
    public RelNode findBestExp() {
        this.ensureRootConverters();
        this.registerMaterializations();
        int cumulativeTicks = 0;
        VolcanoPlannerPhase[] var2 = VolcanoPlannerPhase.values();
        int var3 = var2.length;

        for(int var4 = 0; var4 < var3; ++var4) {
            VolcanoPlannerPhase phase = var2[var4];
            this.setInitialImportance();
            RelOptCost targetCost = this.costFactory.makeHugeCost();
            int tick = 0;
            int firstFiniteTick = -1;
            int splitCount = 0;
            int giveUpTick = 2147483647;

            while(true) {
                ++tick;
                ++cumulativeTicks;
                if (this.root.bestCost.isLe(targetCost)) {
                    if (firstFiniteTick < 0) {
                        firstFiniteTick = cumulativeTicks;
                        this.clearImportanceBoost();
                    }

                    if (!this.ambitious) {
                        break;
                    }

                    targetCost = this.root.bestCost.multiplyBy(0.9D);
                    ++splitCount;
                    if (this.impatient) {
                        if (firstFiniteTick < 10) {
                            giveUpTick = cumulativeTicks + 25;
                        } else {
                            giveUpTick = cumulativeTicks + Math.max(firstFiniteTick / 10,25);
                        }
                    }
                } else {
                    if (cumulativeTicks > giveUpTick) {
                        break;
                    }

                    if (this.root.bestCost.isInfinite() && tick % 10 == 0) {
                        this.injectImportanceBoost();
                    }
                }

                VolcanoRuleMatch match = this.ruleQueue.popMatch(phase);
                if (match == null) {
                    break;
                }

                assert match.getRule().matches(match);

                match.onMatch();
                this.root = this.canonize(this.root);
            }

            this.ruleQueue.phaseCompleted(phase);
        }

        RelNode cheapest = this.root.buildCheapestPlan(this);

        return cheapest;
    }
}

// VolcanoPlanner得到的Flink逻辑节点 cheapest,就是最终选择的结点

cheapest = {FlinkLogicalUnion@6487} "FlinkLogicalUnion#443"
 cluster = {FlinkRelOptCluster@6224} 
 inputs = {RegularImmutableList@6493}  size = 2
  0 = {FlinkLogicalCalc@6498} "FlinkLogicalCalc#441"
   cluster = {FlinkRelOptCluster@6224} 
   calcProgram = {RexProgram@6509} "(expr#0..2=[{inputs}],expr#3=[2],expr#4=[>($t2,$t3)],proj#0..2=[{exprs}],$condition=[$t4])"
   program = {RexProgram@6509} "(expr#0..2=[{inputs}],$condition=[$t4])"
   input = {FlinkLogicalDataStreamTableScan@6510} "rel#437:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog,default_database,UnnamedTable$0])"
   desc = "FlinkLogicalCalc#441"
   rowType = {RelRecordType@6504} "RecordType(BIGINT user,INTEGER amount)"
   digest = "FlinkLogicalCalc#441"
   AbstractRelNode.cluster = {FlinkRelOptCluster@6224} 
   id = 441
   traitSet = {RelTraitSet@5942}  size = 5
  1 = {FlinkLogicalCalc@6499} "FlinkLogicalCalc#442"
   cluster = {FlinkRelOptCluster@6224} 
   calcProgram = {RexProgram@6502} "(expr#0..2=[{inputs}],expr#4=[<($t2,$condition=[$t4])"
   program = {RexProgram@6502} "(expr#0..2=[{inputs}],$condition=[$t4])"
   input = {FlinkLogicalDataStreamTableScan@6503} "rel#435:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog,OrderB])"
   desc = "FlinkLogicalCalc#442"
   rowType = {RelRecordType@6504} "RecordType(BIGINT user,INTEGER amount)"
   digest = "FlinkLogicalCalc#442"
   AbstractRelNode.cluster = {FlinkRelOptCluster@6224} 
   id = 442
   traitSet = {RelTraitSet@5942}  size = 5
 kind = {SqlKind@6494} "UNION"
  lowerName = "union"
  sql = "UNION"
  name = "UNION"
  ordinal = 18
 all = true
 desc = "FlinkLogicalUnion#443"
 rowType = null
 digest = "FlinkLogicalUnion#443"
 AbstractRelNode.cluster = {FlinkRelOptCluster@6224} 
 id = 443
 traitSet = {RelTraitSet@5942}  size = 5


findBestExp:572,VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327,StreamSQLExample (spendreport)

以下是Join的优化

class FlinkLogicalJoin(
    cluster: RelOptCluster,traitSet: RelTraitSet,left: RelNode,right: RelNode,condition: RexNode,joinType: JoinRelType)
  extends FlinkLogicalJoinBase(

  override def convert(rel: RelNode): RelNode = {
    val join = rel.asInstanceOf[LogicalJoin]
    val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
    val newLeft = RelOptRule.convert(join.getLeft,FlinkConventions.LOGICAL)
    val newRight = RelOptRule.convert(join.getRight,FlinkConventions.LOGICAL)

    new FlinkLogicalJoin(
      rel.getCluster,traitSet,newLeft,newRight,join.getCondition,join.getJoinType)
  }
}  

call = {VolcanoRuleMatch@6191} "rule [FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)] rels [rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0,$2),joinType=left)]"
 targetSet = {RelSet@6193} 
 targetSubset = null
 digest = "rule [FlinkLogicalJoinConverter(in:NONE,joinType=left)]"
 cachedImportance = 0.8019000000000001
 volcanoPlanner = {VolcanoPlanner@6194} 
 generatedRelList = null
 id = 71
 operand0 = {RelOptRule$ConverterRelOptRuleOperand@6186} 
  parent = null
  rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)"
  predicate = {ConverterRule$lambda@6246} 
  solveOrder = {int[1]@6247} 
  ordinalInParent = 0
  ordinalInRule = 0
  trait = {Convention$Impl@6184} "NONE"
  clazz = {Class@5010} "class org.apache.calcite.rel.logical.LogicalJoin"
  children = {RegularImmutableList@6230}  size = 0
  childPolicy = {RelOptRuleOperandChildPolicy@6248} "ANY"
 nodeInputs = {RegularImmutableBiMap@6195}  size = 0
 rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)"
 rels = {RelNode[1]@6196} 
  0 = {LogicalJoin@6181} "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,joinType=left)"
   semiJoinDone = false
   systemFieldList = {RegularImmutableList@6230}  size = 0
   condition = {RexCall@6231} "=($0,$2)"
   variablesSet = {RegularImmutableSet@6232}  size = 0
   joinType = {JoinRelType@6233} "LEFT"
   joinInfo = {JoinInfo@6234} 
   left = {RelSubset@6235} "rel#98:Subset#0.NONE.any.None: 0.false.UNKNOWN"
   right = {RelSubset@6236} "rel#99:Subset#1.NONE.any.None: 0.false.UNKNOWN"
   desc = "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,joinType=left)"
   rowType = {RelRecordType@6237} "RecordType(VARCHAR(2147483647) orderId,VARCHAR(2147483647) orderId0,VARCHAR(2147483647) payType)"
   digest = "LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,joinType=left)"
   cluster = {FlinkRelOptCluster@6239} 
   id = 100
   traitSet = {RelTraitSet@6240}  size = 5
 planner = {VolcanoPlanner@6194} 
 parents = null  
  
// 生成时候的调用栈   
   
create:106,FlinkLogicalJoin$ (org.apache.flink.table.planner.plan.nodes.logical)
convert:92,FlinkLogicalJoinConverter (org.apache.flink.table.planner.plan.nodes.logical)
onMatch:144,ConverterRule (org.apache.calcite.rel.convert)
onMatch:208,VolcanoRuleCall (org.apache.calcite.plan.volcano)
findBestExp:631,FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
apply:62,FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:58,FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:157,FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
optimizeTree:170,StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
doOptimize:90,StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:77,CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:248,PlannerBase (org.apache.flink.table.planner.delegation)
translate:151,PlannerBase (org.apache.flink.table.planner.delegation)
toDataStream:210,StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:127,StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:146,TableConversions (org.apache.flink.table.api.scala)
main:75,SimpleOuterJoin (spendreport)
  
abstract class FlinkLogicalJoinBase(  
    cluster: RelOptCluster,joinType: JoinRelType)
  extends Join(
    cluster,left,right,condition,Set.empty[CorrelationId].asJava,joinType)
  with FlinkLogicalRel {
    
  // 这里也会计算cost
  override def computeSelfCost(planner: RelOptPlanner,mq: RelMetadataQuery): RelOptCost = {
    val leftRowCnt = mq.getRowCount(getLeft)
    val leftRowSize = mq.getAverageRowSize(getLeft)
    val rightRowCnt = mq.getRowCount(getRight)

    joinType match {
      case JoinRelType.SEMI | JoinRelType.ANTI =>
        val rightRowSize = mq.getAverageRowSize(getRight)
        val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
        val cpuCost = leftRowCnt + rightRowCnt
        val rowCnt = leftRowCnt + rightRowCnt
        planner.getCostFactory.makeCost(rowCnt,cpuCost,ioCost)
      case _ =>
        val cpuCost = leftRowCnt + rightRowCnt
        val ioCost = (leftRowCnt * leftRowSize) + rightRowCnt
        planner.getCostFactory.makeCost(leftRowCnt,ioCost)
    }
  }  
}  
  
// 调用栈    
  
computeSelfCost:63,FlinkLogicalJoin (org.apache.flink.table.planner.plan.nodes.logical)
getNonCumulativeCost:41,FlinkRelMdNonCumulativeCost (org.apache.flink.table.planner.plan.metadata)
getNonCumulativeCost_$:-1,GeneratedMetadataHandler_NonCumulativeCost
getNonCumulativeCost:-1,GeneratedMetadataHandler_NonCumulativeCost
getNonCumulativeCost:301,RelMetadataQuery (org.apache.calcite.rel.metadata)
getCost:936,VolcanoPlanner (org.apache.calcite.plan.volcano)
propagateCostImprovements0:347,RelSubset (org.apache.calcite.plan.volcano)
propagateCostImprovements:330,RelSubset (org.apache.calcite.plan.volcano)
addRelToSet:1828,VolcanoPlanner (org.apache.calcite.plan.volcano)
registerImpl:1764,VolcanoPlanner (org.apache.calcite.plan.volcano)
register:846,VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:868,VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:1939,VolcanoPlanner (org.apache.calcite.plan.volcano)
transformTo:129,VolcanoRuleCall (org.apache.calcite.plan.volcano)
transformTo:236,RelOptRuleCall (org.apache.calcite.plan)
onMatch:146,SimpleOuterJoin (spendreport)  

优化规则

Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则 和 流的优化规则)。

优化规则分为两类,一类是Calcite提供的内置优化规则(如条件下推,剪枝等),另一类是是将Logical Node转变成 Flink Node 的规则。

这两步骤都属于 Calcite 的优化阶段。得到的 DataStream Plan 封装了如何将节点翻译成对应 DataStream / DataSet 程序的逻辑。其步骤就是将不同的 DataStream/DataSet Node 通过代码生成(CodeGen)翻译成最终可执行的 DataStream/DataSet 程序。

下面就列举了不同的Rule,每条规则会对应生成一个物理节点。比如节点内根据Calcite生成的sql的执行步骤,会进行codegen出DataSet的执行Function代码,

package org.apache.Flink.table.plan.rules
  
  /**
    * RuleSet to optimize plans for batch / DataSet execution
    */
  val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
    // translate to Flink DataSet nodes
    DataSetWindowAggregateRule.INSTANCE,DataSetAggregateRule.INSTANCE,DataSetDistinctRule.INSTANCE,DataSetCalcRule.INSTANCE,DataSetPythonCalcRule.INSTANCE,DataSetJoinRule.INSTANCE,DataSetSingleRowJoinRule.INSTANCE,DataSetScanRule.INSTANCE,DataSetUnionRule.INSTANCE,DataSetIntersectRule.INSTANCE,DataSetMinusRule.INSTANCE,DataSetSortRule.INSTANCE,DataSetValuesRule.INSTANCE,DataSetCorrelateRule.INSTANCE,BatchTableSourceScanRule.INSTANCE
  )
  
   /**
    * RuleSet to optimize plans for stream / DataStream execution
    */
  val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
    // translate to DataStream nodes
    DataStreamSortRule.INSTANCE,DataStreamGroupAggregateRule.INSTANCE,DataStreamOverAggregateRule.INSTANCE,DataStreamGroupWindowAggregateRule.INSTANCE,DataStreamCalcRule.INSTANCE,DataStreamScanRule.INSTANCE,DataStreamUnionRule.INSTANCE,DataStreamValuesRule.INSTANCE,DataStreamCorrelateRule.INSTANCE,DataStreamWindowJoinRule.INSTANCE,DataStreamJoinRule.INSTANCE,DataStreamTemporalTableJoinRule.INSTANCE,StreamTableSourceScanRule.INSTANCE,DataStreamMatchRule.INSTANCE,DataStreamTableAggregateRule.INSTANCE,DataStreamGroupWindowTableAggregateRule.INSTANCE,DataStreamPythonCalcRule.INSTANCE
  )
    
package org.apache.Flink.table.planner.plan.rules

  /**
    * RuleSet to do physical optimize for stream
    */
  val PHYSICAL_OPT_RULES: RuleSet = RuleSets.ofList(
    FlinkExpandConversionRule.STREAM_INSTANCE,// source
    StreamExecDataStreamScanRule.INSTANCE,StreamExecTableSourceScanRule.INSTANCE,StreamExecIntermediateTableScanRule.INSTANCE,StreamExecWatermarkAssignerRule.INSTANCE,StreamExecValuesRule.INSTANCE,// calc
    StreamExecCalcRule.INSTANCE,StreamExecPythonCalcRule.INSTANCE,// union
    StreamExecUnionRule.INSTANCE,// sort
    StreamExecSortRule.INSTANCE,StreamExecLimitRule.INSTANCE,StreamExecSortLimitRule.INSTANCE,StreamExecTemporalSortRule.INSTANCE,// rank
    StreamExecRankRule.INSTANCE,StreamExecDeduplicateRule.RANK_INSTANCE,// expand
    StreamExecExpandRule.INSTANCE,// group agg
    StreamExecGroupAggregateRule.INSTANCE,StreamExecGroupTableAggregateRule.INSTANCE,// over agg
    StreamExecOverAggregateRule.INSTANCE,// window agg
    StreamExecGroupWindowAggregateRule.INSTANCE,StreamExecGroupWindowTableAggregateRule.INSTANCE,// join
    StreamExecJoinRule.INSTANCE,StreamExecWindowJoinRule.INSTANCE,StreamExecTemporalJoinRule.INSTANCE,StreamExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,StreamExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,// CEP
    StreamExecMatchRule.INSTANCE,// correlate
    StreamExecConstantTableFunctionScanRule.INSTANCE,StreamExecCorrelateRule.INSTANCE,// sink
    StreamExecSinkRule.INSTANCE
  )
StreamExecUnionRule

一个具体的Rule举例 ,这里是 Union 的 Rule :

package org.apache.Flink.table.planner.plan.rules.physical.stream

class StreamExecUnionRule
  extends ConverterRule(
    classOf[FlinkLogicalUnion],FlinkConventions.LOGICAL,FlinkConventions.STREAM_PHYSICAL,"StreamExecUnionRule") {
  
  def convert(rel: RelNode): RelNode = {
    val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
    val newInputs = union.getInputs.map(RelOptRule.convert(_,FlinkConventions.STREAM_PHYSICAL))

    // 这里本条规则会对应生成一个物理节点。节点内根据Calcite生成的sql的执行步骤,会进行codegen出Stream的执行Function代码,
    new StreamExecUnion(
      rel.getCluster,newInputs,union.all,rel.getRowType)
    }
  }  
} 

public class VolcanoPlanner extends AbstractRelOptPlanner {
    public RelNode findBestExp() {
             // 在这里会对Rule进行匹配调用
             match.onMatch();
        return cheapest;
    }
}

match = {VolcanoRuleMatch@6252} "rule [StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)] rels [rel#215:FlinkLogicalUnion.LOGICAL.any.None: 0.false.UNKNOWN(input#0=RelSubset#211,input#1=RelSubset#214,all=true)]"
 targetSet = {RelSet@6298} 
 targetSubset = null
 digest = "rule [StreamExecUnionRule(in:LOGICAL,all=true)]"
 cachedImportance = 0.81
 volcanoPlanner = {VolcanoPlanner@6259} 
 generatedRelList = null
 id = 521
 operand0 = {RelOptRule$ConverterRelOptRuleOperand@6247} 
 nodeInputs = {RegularImmutableBiMap@6299}  size = 0
 rule = {StreamExecUnionRule@6241} "StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)"
 rels = {RelNode[1]@6300} 
 planner = {VolcanoPlanner@6259} 
 parents = null

// 调用栈       
   
create:106,SimpleOuterJoin (spendreport)
   
  
// 调用栈      
  
convert:46,StreamExecUnionRule (org.apache.Flink.table.planner.plan.rules.physical.stream)
onMatch:144,StreamSQLExample (spendreport)
StreamExecJoinRule

另一个具体的Rule举例 ,这里是 Join的优化,StreamExecJoin的生成

class StreamExecJoinRule {
    override def onMatch(call: RelOptRuleCall): Unit = {
      val newJoin = new StreamExecJoin(
        join.getCluster,providedTraitSet,join.getJoinType)
      call.transformTo(newJoin)
    }
}

newJoin = {StreamExecJoin@6326} "StreamExecJoin#152"
 cluster = {FlinkRelOptCluster@5072} 
 joinType = {JoinRelType@5038} "LEFT"
 LOG = null
 transformation = null
 bitmap$trans$0 = false
 CommonPhysicalJoin.joinType = {JoinRelType@5038} "LEFT"
 filterNulls = null
 keyPairs = null
 flinkJoinType = null
 inputRowType = null
 bitmap$0 = 0
 condition = {RexCall@5041} "=($0,$2)"
 variablesSet = {RegularImmutableSet@6342}  size = 0
 Join.joinType = {JoinRelType@5038} "LEFT"
 joinInfo = {JoinInfo@6343} 
 left = {RelSubset@6328} "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  bestCost = {FlinkCost$$anon$1@6344} "{inf}"
  set = {RelSet@6348} 
  best = null
  timestamp = 0
  boosted = false
  desc = "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  rowType = {RelRecordType@6349} "RecordType(VARCHAR(2147483647) orderId,VARCHAR(2147483647) productName)"
  digest = "Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  cluster = {FlinkRelOptCluster@5072} 
  id = 150
  traitSet = {RelTraitSet@6336}  size = 5
 right = {RelSubset@6329} "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  bestCost = {FlinkCost$$anon$1@6344} "{inf}"
  set = {RelSet@6345} 
  best = null
  timestamp = 0
  boosted = false
  desc = "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  rowType = null
  digest = "Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  cluster = {FlinkRelOptCluster@5072} 
  id = 151
  traitSet = {RelTraitSet@6336}  size = 5
 desc = "StreamExecJoin#152"
 rowType = null
 digest = "StreamExecJoin#152"
 AbstractRelNode.cluster = {FlinkRelOptCluster@5072} 
 id = 152
 traitSet = {RelTraitSet@6327}  size = 5

// 调用栈       
   
<init>:58,StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
onMatch:128,StreamExecJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream)
onMatch:208,SimpleOuterJoin (spendreport)

5. 生成ExecutionPlan

这时候脉络图如下

// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode   
*        |      
*        |     
*    FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules      
*    VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan 
*        | 
*        |   
*        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,逻辑执行计划
*        |  
*        |    
*    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
*    VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
*        |       
*        |   
*        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
*        |      
*        |   
*    StreamExecJoin.translateToPlanInternal  // 作用是生成 StreamOperator,即Flink算子  
*        |     
*        |     
*        +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask 
*        |     
*        |  

Calcite 针对不同的大数据组件,将优化后的plan映射到最终的大数据引擎,如折射成Flink图。

这一块只要是递归调用各个节点DataStreamRel的translateToPlan方法,这个方法利用CodeGen元编程成Flink的各种算子。现在就相当于我们直接利用Flink的DataSet或DataStream API开发的程序。

class StreamPlanner(
    executor: Executor,isStreamingMode = true) {
  override protected def translateToPlan(
      execNodes: util.List[ExecNode[_,_]]): util.List[Transformation[_]] = {
    execNodes.map {
      case node: StreamExecNode[_] => node.translateToPlan(this)
      case _ =>
        throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
          "This is a bug and should not happen. Please file an issue.")
    }
  }
}

package org.apache.Flink.table.planner.plan.nodes.physical.stream	

class StreamExecUnion(
    cluster: RelOptCluster,inputRels: util.List[RelNode],all: Boolean,outputRowType: RelDataType)
  extends Union(cluster,inputRels,all)
  with StreamPhysicalRel
  with StreamExecNode[BaseRow] {

  // 这里就生成了Flink算子
  override protected def translateToPlanInternal(
      planner: StreamPlanner): Transformation[BaseRow] = {
    val transformations = getInputNodes.map {
      input => input.translateToPlan(planner).asInstanceOf[Transformation[BaseRow]]
    }
    new UnionTransformation(transformations)
  }
}

 // 调用栈

translateToPlanInternal:85,StreamSQLExample (spendreport)

6. 运行时

此时脉络图补充完全。

// NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode   
*        |      
*        |     
*    FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules      
*    VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan 
*        | 
*        |   
*        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,逻辑执行计划
*        |  
*        |    
*    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
*    VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
*        |       
*        |   
*        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
*        |      
*        |     
*    StreamExecJoin.translateToPlanInternal  // 作用是生成 StreamOperator,即Flink算子  
*        |     
*        |     
*        +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask 
*        |     
*        |       
*    StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask调用StreamingJoinOperator,真实的执行  
*        |
*        |  

运行时候,则会在StreamTask中进行业务操作,这就是我们熟悉的操作了。调用栈举例如下

processElement:150,StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
emitNext:128,StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
processInput:69,StreamOneInputProcessor (org.apache.Flink.streaming.runtime.io)
processInput:311,StreamTask (org.apache.Flink.streaming.runtime.tasks)
runDefaultAction:-1,354713989 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$710)
runMailboxLoop:187,MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:487,StreamTask (org.apache.Flink.streaming.runtime.tasks)
invoke:470,StreamTask (org.apache.Flink.streaming.runtime.tasks)
doRun:707,Task (org.apache.Flink.runtime.taskmanager)
run:532,Task (org.apache.Flink.runtime.taskmanager)
run:748,Thread (java.lang)

0x05 代码实例 UNION

下面是如何具体生成各种执行计划的代码

import org.apache.Flink.api.java.utils.ParameterTool
import org.apache.Flink.api.scala._
import org.apache.Flink.streaming.api.scala.{DataStream,StreamExecutionEnvironment}
import org.apache.Flink.table.api.EnvironmentSettings
import org.apache.Flink.table.api.scala._

object StreamSQLExample {

  // *************************************************************************
  //     PROGRAM
  // *************************************************************************
  def main(args: Array[String]): Unit = {

    val params = ParameterTool.fromArgs(args)
    val planner = if (params.has("planner")) params.get("planner") else "Flink"

    // set up execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = if (planner == "blink") {  // use blink planner in streaming mode
      val settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build()
      StreamTableEnvironment.create(env,settings)
    } else if (planner == "Flink") {  // use Flink planner in streaming mode
      StreamTableEnvironment.create(env)
    } else {
      System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>'," +
        "where planner (it is either Flink or blink,and the default is Flink) indicates whether the " +
        "example uses Flink planner or blink planner.")
      return
    }

    val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L,"beer",3),Order(1L,"diaper",4),Order(3L,"rubber",2)))

    val orderB: DataStream[Order] = env.fromCollection(Seq(
      Order(2L,"pen",Order(2L,Order(4L,1)))

    // convert DataStream to Table
    val tableA = tEnv.fromDataStream(orderA,'user,'product,'amount)
    // register DataStream as Table
    tEnv.registerDataStream("OrderB",orderB,'amount)

    // union the two tables
    val result = tEnv.sqlQuery(
      s"""
         |SELECT * FROM $tableA WHERE amount > 2
         |UNION ALL
         |SELECT * FROM OrderB WHERE amount < 2
        """.stripMargin)

    result.toAppendStream[Order].print()
    print(tEnv.explain(result))
    env.execute()
  }

  // *************************************************************************
  //     USER DATA TYPES
  // *************************************************************************
  case class Order(user: Long,product: String,amount: Int)
}

整个流程的转换大体就像这样:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(user=[$0],product=[$1],amount=[$2])
:  +- LogicalFilter(condition=[>($2,2)])
:     +- LogicalTableScan(table=[[default_catalog,UnnamedTable$0]])
+- LogicalProject(user=[$0],amount=[$2])
   +- LogicalFilter(condition=[<($2,2)])
      +- LogicalTableScan(table=[[default_catalog,OrderB]])

== Optimized Logical Plan ==
Union(all=[true],union=[user,product,amount])
:- Calc(select=[user,amount],where=[>(amount,2)])
:  +- DataStreamScan(table=[[default_catalog,UnnamedTable$0]],fields=[user,amount])
+- Calc(select=[user,where=[<(amount,2)])
   +- DataStreamScan(table=[[default_catalog,OrderB]],amount])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : Source: Collection Source

Stage 2 : Data Source
	content : Source: Collection Source

	Stage 10 : Operator
		content : SourceConversion(table=[default_catalog.default_database.UnnamedTable$0],amount])
		ship_strategy : FORWARD

		Stage 11 : Operator
			content : Calc(select=[user,where=[(amount > 2)])
			ship_strategy : FORWARD

			Stage 12 : Operator
				content : SourceConversion(table=[default_catalog.default_database.OrderB],amount])
				ship_strategy : FORWARD

				Stage 13 : Operator
					content : Calc(select=[user,where=[(amount < 2)])
					ship_strategy : FORWARD

0x06 代码实例 OUTER JOIN

import java.sql.Timestamp
import org.apache.Flink.api.java.utils.ParameterTool
import org.apache.Flink.api.scala._
import org.apache.Flink.streaming.api.TimeCharacteristic
import org.apache.Flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.Flink.table.api.{EnvironmentSettings,TableEnvironment}
import org.apache.Flink.table.api.scala._
import org.apache.Flink.types.Row

import scala.collection.mutable

object SimpleOuterJoin {
  def main(args: Array[String]): Unit = {

    val params = ParameterTool.fromArgs(args)
    val planner = if (params.has("planner")) params.get("planner") else "Flink"

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = if (planner == "blink") {  // use blink planner in streaming mode
      val settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build()
      StreamTableEnvironment.create(env,and the default is Flink) indicates whether the " +
        "example uses Flink planner or blink planner.")
      return
    }

    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 构造订单数据
    val ordersData = new mutable.MutableList[(String,String)]
    ordersData.+=(("001","iphone"))
    ordersData.+=(("002","mac"))
    ordersData.+=(("003","book"))
    ordersData.+=(("004","cup"))

    // 构造付款表
    val paymentData = new mutable.MutableList[(String,String)]
    paymentData.+=(("001","alipay"))
    paymentData.+=(("002","card"))
    paymentData.+=(("003","card"))
    paymentData.+=(("004","alipay"))
    val orders = env
      .fromCollection(ordersData)
       .toTable(tEnv,'orderId,'productName)
    val ratesHistory = env
      .fromCollection(paymentData)
      .toTable(tEnv,'payType)

    tEnv.registerTable("Orders",orders)
    tEnv.registerTable("Payment",ratesHistory)

    var sqlQuery =
      """
        |SELECT
        |  o.orderId,|  o.productName,|  p.payType
        |FROM
        |  Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId
        |""".stripMargin
    tEnv.registerTable("TemporalJoinResult",tEnv.sqlQuery(sqlQuery))

    val result = tEnv.scan("TemporalJoinResult").toRetractStream[Row]
    result.print()
    print(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
    env.execute()
  }
}

整个流程的转换如下:

== Abstract Syntax Tree ==
LogicalProject(orderId=[$0],productName=[$1],payType=[$3])
+- LogicalJoin(condition=[=($0,$2)],joinType=[left])
   :- LogicalTableScan(table=[[default_catalog,Orders]])
   +- LogicalTableScan(table=[[default_catalog,Payment]])

== Optimized Logical Plan ==
Calc(select=[orderId,productName,payType])
+- Join(joinType=[LeftOuterJoin],where=[=(orderId,orderId0)],select=[orderId,orderId0,payType],leftInputSpec=[NoUniqueKey],rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[orderId]])
   :  +- DataStreamScan(table=[[default_catalog,Orders]],fields=[orderId,productName])
   +- Exchange(distribution=[hash[orderId]])
      +- DataStreamScan(table=[[default_catalog,Payment]],payType])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : Source: Collection Source

Stage 2 : Data Source
	content : Source: Collection Source

	Stage 11 : Operator
		content : SourceConversion(table=[default_catalog.default_database.Orders],productName])
		ship_strategy : FORWARD

		Stage 13 : Operator
			content : SourceConversion(table=[default_catalog.default_database.Payment],payType])
			ship_strategy : FORWARD

			Stage 15 : Operator
				content : Join(joinType=[LeftOuterJoin],where=[(orderId = orderId0)],rightInputSpec=[NoUniqueKey])
				ship_strategy : HASH

				Stage 16 : Operator
					content : Calc(select=[orderId,payType])
					ship_strategy : FORWARD

输出结果是
(true,001,iphone,null)
(false,null)
(true,alipay)
(true,002,mac,card)
(true,003,book,004,cup,alipay)

下面是调试时候的调用栈,这个可以给大家参考

// 调用Rule进行优化

matches:49,StreamExecJoinRule (org.apache.Flink.table.planner.plan.rules.physical.stream)
matchRecurse:263,VolcanoRuleCall (org.apache.calcite.plan.volcano)
matchRecurse:370,VolcanoRuleCall (org.apache.calcite.plan.volcano)
match:247,VolcanoRuleCall (org.apache.calcite.plan.volcano)
fireRules:1534,VolcanoPlanner (org.apache.calcite.plan.volcano)
registerImpl:1807,VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:90,VolcanoPlanner (org.apache.calcite.plan.volcano)
onRegister:329,AbstractRelNode (org.apache.calcite.rel)
registerImpl:1668,VolcanoPlanner (org.apache.calcite.plan.volcano)
changeTraits:529,StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toRetractStream:127,StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toRetractStream:146,TableConversions (org.apache.Flink.table.api.scala)
main:75,SimpleOuterJoin (spendreport)
  
  
// 调用Rule进行转换到Flink逻辑算子  
  
translateToPlanInternal:140,StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:51,StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58,ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:51,StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:54,StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:39,StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58,ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:38,StreamExecCalcBase (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToTransformation:184,StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:153,StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:48,StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58,ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:48,StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
apply:60,StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
apply:59,StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
apply:234,StreamPlanner (org.apache.flink.table.planner.delegation)
translate:153,SimpleOuterJoin (spendreport)  
 
// 运行时候
  
@Internal
public final class StreamTwoInputProcessor<IN1,IN2> implements StreamInputProcessor {
	private void processRecord2(
			StreamRecord<IN2> record,TwoInputStreamOperator<IN1,IN2,?> streamOperator,Counter numRecordsIn) throws Exception {

		streamOperator.setKeyContextElement2(record);
		streamOperator.processElement2(record);
		postProcessRecord(numRecordsIn);
	}  
}    

// 能看出来,streamOperator就是StreamingJoinOperator

streamOperator = {StreamingJoinOperator@10943} 
 leftIsOuter = true
 rightIsOuter = false
 outRow = {JoinedRow@10948} "JoinedRow{row1=org.apache.flink.table.dataformat.BinaryRow@dc6a1b67,row2=(+|null,null)}"
 leftNullRow = {GenericRow@10949} "(+|null,null)"
 rightNullRow = {GenericRow@10950} "(+|null,null)"
 leftRecordStateView = {OuterJoinRecordStateViews$InputSideHasNoUniqueKey@10945} 
 rightRecordStateView = {JoinRecordStateViews$InputSideHasNoUniqueKey@10946} 
 generatedJoinCondition = {GeneratedJoinCondition@10951} 
 leftType = {BaseRowTypeInfo@10952} "BaseRow(orderId: STRING,productName: STRING)"
 rightType = {BaseRowTypeInfo@10953} "BaseRow(orderId: STRING,payType: STRING)"
 leftInputSideSpec = {JoinInputSideSpec@10954} "NoUniqueKey"
 rightInputSideSpec = {JoinInputSideSpec@10955} "NoUniqueKey"
 nullFilterKeys = {int[1]@10956} 
 nullSafe = false
 filterAllNulls = true
 minRetentionTime = 0
 stateCleaningEnabled = false
 joinCondition = {AbstractStreamingJoinOperator$JoinConditionWithNullFilters@10947} 
 collector = {TimestampedCollector@10957} 
 chainingStrategy = {ChainingStrategy@10958} "HEAD"
 container = {TwoInputStreamTask@10959} "Join(joinType=[LeftOuterJoin],rightInputSpec=[NoUniqueKey]) -> Calc(select=[orderId,payType]) -> SinkConversionToTuple2 -> Sink: Print to Std. Out (1/1)"
 config = {StreamConfig@10960} "\n=======================Stream Config=======================\nNumber of non-chained inputs: 2\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Join(joinType=[LeftOuterJoin],rightInputSpec=[NoUniqueKey])-7 -> Calc(select=[orderId,payType])-8,typeNumber=0,selectedNames=[],outputPartitioner=FORWARD,outputTag=null)]\nOperator: SimpleOperatorFactory\nBuffer timeout: 100\nState Monitoring: false\n\n\n---------------------\nChained task configs\n---------------------\n{8=\n=======================Stream Config=======================\nNumber of non-chained inputs: 0\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Calc(select=[orderId,payType])-8 -> SinkConversionToTuple2-9,outputTag=null)]\nOperator: CodeGenOperatorFactory\nBuffer timeout: "
 output = {AbstractStreamOperator$CountingOutput@10961} 
 runtimeContext = {StreamingRuntimeContext@10962} 
 stateKeySelector1 = {BinaryRowKeySelector@10963} 
 stateKeySelector2 = {BinaryRowKeySelector@10964} 
 keyedStateBackend = {HeapKeyedStateBackend@10965} "HeapKeyedStateBackend"
 keyedStateStore = {DefaultKeyedStateStore@10966} 
 operatorStateBackend = {DefaultOperatorStateBackend@10967} 
 metrics = {OperatorMetricGroup@10968} 
 latencyStats = {LatencyStats@10969} 
 processingTimeService = {ProcessingTimeServiceImpl@10970} 
 timeServiceManager = {InternalTimeServiceManager@10971} 
 combinedWatermark = -9223372036854775808
 input1Watermark = -9223372036854775808
 input2Watermark = -9223372036854775808
  
// 处理table 1 

processElement1:118,StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream)
processRecord1:135,StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
lambda$new$0:100,StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
accept:-1,169462196 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$733)
emitRecord:362,StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.Flink.streaming.runtime.io)
processElement:151,StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
processInput:182,StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
processInput:311,1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713)
runMailboxLoop:187,Thread (java.lang)
  
// 处理table 2 

processElement2:123,StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream)
processRecord2:145,StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
lambda$new$1:107,76811487 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$734)
emitRecord:362,StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
processInput:185,Thread (java.lang)
  
// 处理table 1   
  
processRecord1:134,StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
lambda$new$0:100,StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
accept:-1,230607815 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$735)
emitRecord:362,StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.flink.streaming.runtime.io)
processElement:151,StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
emitNext:128,StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
processInput:182,StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
processInput:311,StreamTask (org.apache.flink.streaming.runtime.tasks)
runDefaultAction:-1,33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718)
runMailboxLoop:187,MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:487,StreamTask (org.apache.flink.streaming.runtime.tasks)
invoke:470,StreamTask (org.apache.flink.streaming.runtime.tasks)
doRun:707,Task (org.apache.flink.runtime.taskmanager)
run:532,Task (org.apache.flink.runtime.taskmanager)
run:748,Thread (java.lang)	
  
// 处理table 2   
  
processRecord2:144,StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
lambda$new$1:107,212261435 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$736)
emitRecord:362,StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
processInput:185,Thread (java.lang)  
  

0x07 代码实例 WINDOW JOIN

import java.sql.Timestamp
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings,TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

import scala.collection.mutable

import java.sql.Timestamp

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

import scala.collection.mutable

object SimpleTimeIntervalJoinA {
  def main(args: Array[String]): Unit = {
    val params = ParameterTool.fromArgs(args)
    val planner = if (params.has("planner")) params.get("planner") else "flink"

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = if (planner == "blink") {  // use blink planner in streaming mode
      val settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build()
      StreamTableEnvironment.create(env,settings)
    } else if (planner == "flink") {  // use flink planner in streaming mode
      StreamTableEnvironment.create(env)
    } else {
      System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>'," +
        "where planner (it is either flink or blink,and the default is flink) indicates whether the " +
        "example uses flink planner or blink planner.")
      return
    }
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 构造订单数据
    val ordersData = new mutable.MutableList[(String,String,Timestamp)]
    ordersData.+=(("001","iphone",new Timestamp(1545800002000L)))
    ordersData.+=(("002","mac",new Timestamp(1545800003000L)))
    ordersData.+=(("003","book",new Timestamp(1545800004000L)))
    ordersData.+=(("004","cup",new Timestamp(1545800018000L)))

    // 构造付款表
    val paymentData = new mutable.MutableList[(String,Timestamp)]
    paymentData.+=(("001","alipay",new Timestamp(1545803501000L)))
    paymentData.+=(("002","card",new Timestamp(1545803602000L)))
    paymentData.+=(("003",new Timestamp(1545803610000L)))
    paymentData.+=(("004",new Timestamp(1545803611000L)))
    val orders = env
      .fromCollection(ordersData)
      .assignTimestampsAndWatermarks(new TimestampExtractor[String,String]())
      .toTable(tEnv,'productName,'orderTime.rowtime)
    val ratesHistory = env
      .fromCollection(paymentData)
      .assignTimestampsAndWatermarks(new TimestampExtractor[String,'payType,'payTime.rowtime)

    tEnv.registerTable("Orders",|  p.payType,|  o.orderTime,|  cast(payTime as timestamp) as payTime
        |FROM
        |  Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId AND
        | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR
        |""".stripMargin
    tEnv.registerTable("TemporalJoinResult",tEnv.sqlQuery(sqlQuery))

    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
    result.print()
    print(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
    env.execute()
  }
}

class TimestampExtractor[T1,T2]
  extends BoundedOutOfOrdernessTimestampExtractor[(T1,T2,Timestamp)](Time.seconds(10)) {
  override def extractTimestamp(element: (T1,Timestamp)): Long = {
    element._3.getTime
  }
}

输出如下

== Abstract Syntax Tree ==
LogicalProject(orderId=[$0],payType=[$4],orderTime=[$2],payTime=[CAST($5):TIMESTAMP(6)])
+- LogicalJoin(condition=[AND(=($0,$3),>=($5,<=($5,+($2,3600000:INTERVAL HOUR)))],payType,orderTime,CAST(CAST(payTime)) AS payTime])
+- WindowJoin(joinType=[LeftOuterJoin],windowBounds=[isRowTime=true,leftLowerBound=-3600000,leftUpperBound=0,leftTimeIndex=2,rightTimeIndex=2],where=[AND(=(orderId,orderId0),>=(payTime,orderTime),<=(payTime,+(orderTime,payTime])
   :- Exchange(distribution=[hash[orderId]])
   :  +- DataStreamScan(table=[[default_catalog,orderTime])
   +- Exchange(distribution=[hash[orderId]])
      +- DataStreamScan(table=[[default_catalog,payTime])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : Source: Collection Source

	Stage 2 : Operator
		content : Timestamps/Watermarks
		ship_strategy : FORWARD

Stage 3 : Data Source
	content : Source: Collection Source

	Stage 4 : Operator
		content : Timestamps/Watermarks
		ship_strategy : FORWARD

		Stage 13 : Operator
			content : SourceConversion(table=[default_catalog.default_database.Orders],orderTime])
			ship_strategy : FORWARD

			Stage 15 : Operator
				content : SourceConversion(table=[default_catalog.default_database.Payment],payTime])
				ship_strategy : FORWARD

				Stage 17 : Operator
					content : WindowJoin(joinType=[LeftOuterJoin],where=[((orderId = orderId0) AND (payTime >= orderTime) AND (payTime <= (orderTime + 3600000:INTERVAL HOUR)))],payTime])
					ship_strategy : HASH

					Stage 18 : Operator
						content : Calc(select=[orderId,CAST(CAST(payTime)) AS payTime])
						ship_strategy : FORWARD

001,alipay,2018-12-26T04:53:22,2018-12-26T05:51:41
002,card,2018-12-26T04:53:23,2018-12-26T05:53:22
004,2018-12-26T04:53:38,2018-12-26T05:53:31
003,null,2018-12-26T04:53:24,null

相关类以及调用栈

class StreamExecWindowJoin {  
}

class StreamExecWindowJoinRule
  extends ConverterRule(
    classOf[FlinkLogicalJoin],"StreamExecWindowJoinRule") {
}


matches:54,StreamExecWindowJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream)
matchRecurse:263,StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toAppendStream:107,StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toAppendStream:101,TableConversions (org.apache.flink.table.api.scala)
main:93,SimpleTimeIntervalJoinA$ (spendreport)
main:-1,SimpleTimeIntervalJoinA (spendreport)
  
  
translateToPlanInternal:136,StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:53,StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58,ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:53,StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:54,SimpleTimeIntervalJoinA (spendreport)   

0x08 参考

Flink table&Sql中使用Calcite

Flink sql的实现

Calcite 功能简析及在 Flink 的应用

基于Flink1.8 深入理解Flink Sql执行流程 + Flink Sql语法扩展

使用Flink Table &Sql api来构建批量和流式应用(3)Flink Sql 使用

Flink关系型API: Table API 与SQL

Flink sql的实现

Flink如何实现动态表与静态表的Join操作

一文解析Flink SQL工作流程

Flink1.9-table/SQLAPI

【Flink SQL引擎】:Calcite 功能简析及在 Flink 的应用

Apache Calcite 处理流程详解(一)

Apache Calcite 优化器详解(二)

揭秘 Flink 1.9 新架构,Blink Planner 你会用了吗?

Flink 原理与实现:Table & SQL API | Jark's Blog

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。