Spark 自动转换BroadcastJoin代码分析

背景
Spark在判断能否转为BroadCastJoin时主要是根据输入表的大小是否超过了 spark.sql.autoBroadcastJoinThreshold 参数所配置的大小,如果未超过阈值则可以转为BroadCastJoin.

结论
先说下整个判断的流程:
1.首先在非分区表情况下并且 spark.sql.statistics.fallBackToHdfs此参数开启时会统计表hdfs目录大小
2.在物理计划生成时会统计输入的大小(cbo开启时统计条数 不开启则只统计大小)
3.最终是否走BroadCast会结合如上的两个值进行判断只要满足一个即可

流程分析
代码分支:spark3.2

这里只讨论在转换过程中是如何获取输入大小的。
第一部分来自Analyze阶段对LoginPlan做resolved时,这里Spark内置了一个统计输入的Rule:

  class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  //主要是此方法会进行静态信息的统计
  private def hiveTableWithStats(relation: HiveTableRelation): HiveTableRelation = {
    val table = relation.tableMeta
    val partitionCols = relation.partitionCols
    //如果spark.sql.statistics.fallBackToHdfs设置为True并且此时为非分区表时,则统计此时hdfs相关路径大小作为输入大小
    val sizeInBytes = if (conf.fallBackToHdfsForStatsEnabled && partitionCols.isEmpty) {
      try {
        val hadoopConf = session.sessionState.newHadoopConf()
        val tablePath = new Path(table.location)
        val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
        fs.getContentSummary(tablePath).getLength
      } catch {
        case e: IOException =>
          logWarning("Failed to get table size from HDFS.", e)
          conf.defaultSizeInBytes
      }
    } else {
      //如果是分区表或者在参数为false时,则使用默认值,这里的默认值是 Long.MAX_VALUE 
      //因此默认情况下是不会走BroadCastJoin的
      conf.defaultSizeInBytes
    }

    val stats = Some(Statistics(sizeInBytes = BigInt(sizeInBytes)))
    relation.copy(tableStats = stats)
  }

  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    case relation: HiveTableRelation
      if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
      hiveTableWithStats(relation)

    // handles InsertIntoStatement specially as the table in InsertIntoStatement is not added in its
    // children, hence not matched directly by previous HiveTableRelation case.
    case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _)
      if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
      i.copy(table = hiveTableWithStats(relation))
  }
}

第二部分:这里是在生成物理执行计划时所做的操作
这部分可以通过 Strategy的子类 JoinSelection中定义的生成Join策略查看到,其中在判断是否可以转为BroadCastJoin时有如下方法:

 def getBroadcastBuildSide(
      left: LogicalPlan,
      right: LogicalPlan,
      joinType: JoinType,
      hint: JoinHint,
      hintOnly: Boolean,
      conf: SQLConf): Option[BuildSide] = {
      //省略部分代码
      canBroadcastBySize(left, conf) && !hintToNotBroadcastLeft(hint)
      //省略部分代码
    )
  }

再看canBroadcastBySize方法具体实现:

  //这里会从两个地方获取阈值的大小一部分是自适应另外就是上面提到的参数
  //具体是从那个地方获取由plan.stats.isRuntime决定,最终应该是由
  //spark.sql.adaptive.enabled(开启自适应) 参数来决定
  def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
    val autoBroadcastJoinThreshold = if (plan.stats.isRuntime) {
      conf.getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD)
        .getOrElse(conf.autoBroadcastJoinThreshold)
    } else {
      conf.autoBroadcastJoinThreshold
    }
    //这里根据输入的大小来决定是否走BroadCastJoin
    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= autoBroadcastJoinThreshold
  }

这里继续查看 plan.stats
//这里就是判断是否从Cbo中获取或者另外的方式

 def stats: Statistics = statsCache.getOrElse {
    if (conf.cboEnabled) {
      statsCache = Option(BasicStatsPlanVisitor.visit(self))
    } else {
      statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
    }
    statsCache.get
  }

visit方法:
这里我们在join 时一般都是两个表的join即一般都是HiveTableRelation(LoginalPlan的子类 这里不会绝对是这个,比如自己从内存中创建的表),因此会调用如上两个类的default方法

 def visit(p: LogicalPlan): T = p match {
   //省略部分代码
    case p: Sort => visitSort(p)
    case p: WithCTE => visitWithCTE(p)
    case p: LogicalPlan => default(p)
  }

两者区别在于是否对输入条数做了统计
首先SizeInBytesOnlyStatsPlanVisitor,这里最后调用的是其中的Default方法

  override def default(p: LogicalPlan): Statistics = p match {
    //调用叶子节点的方法计算当前的输入大小
    case p: LeafNode => p.computeStats()
    //遍历所有子节点进行大小的统计 这里未统计条数
    //product方法就是求和
    case _: LogicalPlan =>
      Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).filter(_ > 0L).product)
  }

//BasicStatsPlanVisitor

override def default(p: LogicalPlan): Statistics = p match {
    case p: LeafNode => p.computeStats()
    case _: LogicalPlan =>
      val stats = p.children.map(_.stats)
      val rowCount = if (stats.exists(_.rowCount.isEmpty)) {
        None
      } else {
        Some(stats.map(_.rowCount.get).filter(_ > 0L).product)
      }
      //这里统计了rowCount
      Statistics(sizeInBytes = stats.map(_.sizeInBytes).filter(_ > 0L).product, rowCount = rowCount)
  }

上面在统计子节点时,最终都会调用的LeafNode节点,然后调用其computeStats方法
因此这里我们可以到HiveTableRelation中查看具体的实现:

//这里就是取默认的tableStats 即前面第一部分获取的值或者在cbo 开启时进行统计,这里统计完成就会给上面的地方进行最终统计然后来判断是否走BroadCastJoin
//HiveTableRelation

  override def computeStats(): Statistics = {
    tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled || conf.planStatsEnabled))
      .orElse(tableStats)
      .getOrElse {
      throw QueryExecutionErrors.tableStatsNotSpecifiedError
    }
  }

//ExternalRDD这种就是从内存中创建的

 override def computeStats(): Statistics = Statistics(
    // TODO: Instead of returning a default value here, find a way to return a meaningful size
    // estimate for RDDs. See PR 1238 for more discussions.
    sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
  )

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340