一文读懂Spark~~~

文章目录

1、spark概述

什么是spark:

spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

spark与hadoop的差异:

根本差异是多个作业之间的数据通信问题,spark多个作业之间数据通信是基于内存,而hadoop是基于磁盘。

Spark与Hadoop对比

spark的核心模块

img

  1. Spark Core
    Spark Core 中提供了 Spark 最基础与最核心的功能, Spark 其他的功能如: Spark SQL ,Spark Streaming , GraphX, MLlib 都是在 Spark Core 的基础上进行扩展的;

  2. Spark SQL
    Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL ,用户可以使用 SQL或者 Apache Hive 版本的 SQL 方言( HQL )来查询数据;

  3. Spark Streaming
    Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的 API;

  4. Spark MLlib
    MLlib 是 Spark 提供的一个机器学习算法库。 MLlib 不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语;

  5. Spark GraphX
    GraphX 是 Spark 面向图计算提供的框架与算法库;

Spark 常用部署模式

Spark 作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行 , 在国

内工作中主流的环境为 Yarn ,不过逐渐容器式环境也慢慢流行起来。接下来,我们就分别

看看不同环境下 Spark 的运行

img

spark提供了基于不同环境下的部署模式,本篇针对常用的部署和运行模式,简单做一下总结;

  • Local 模式

    所谓的 Local 模式,就是不需 要其他任何节点资源就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等, 之前在 IDEA 中运行代码的环境我们称之为开发环境,不太一样;

  • Standalone 模式

    local 本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的集群中去执行,这里我们来看看只使用 Spark 自身节点运行的集群模式,也就是我们所谓的 独立部署(Standalone )模式。 Spark 的 Standalone 模式体现了经典的 master-slave 模式。

  • 配置高可用(HA)模式

    所谓的高可用是因为当前集群中的 Master 节点只有一个,所以会存在单点故障问题。所以为了解决单点故障问题,需要在集群中配置多个 Master 节点,一旦处于活动状态的 Master

    发生故障时,由备用 Master 提供服务,保证作业可以继续执行。这里的高可用一般采用

    Zookeeper 设置;

  • Yarn 模式

    独立部署( Standalone )模式由 Spark 自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark 主 要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是 和其他专业的资源调度框架集成会更靠谱一些;

  • K8S & Mesos 模式

    Mesos 是 Apache 下的开源分布式资源管理框架,它被称为是分布式系统的内核 , 在 Twitter 得到广泛使用 , 管理着 Twitter 超过 30,0000 台服务器上的应用部署,但是在国内,依 然使用着传统的 Hadoop 大数据框架,所以国内使用 Mesos 框架的并不多,但是原理其实都 差不多;

  • Windows 模式

    在自己学习时,每次都需要启动虚拟机,启动集群,这是一个比较繁琐的过程, 并且会占大量的系统资源,导致系统执行变慢,不仅仅影响学习效果,也影响学习进度, Spark 非常暖心地提供了可以在 windows 系统下启动本地集群的方式,这样,在不使用虚拟 机的情况下,也能学习 Spark 的基本使用;

部署模式对比:

img

Spark相关端口号

  • Spark 查看当前 Spark-shell 运行任务情况端口号: 4040 (计算) ;
  • Spark Master 内部通信服务端口号:7077;
  • Standalone 模式下,Spark Master Web 端口号:8080(资源);
  • Spark 历史服务器端口号:18080;
  • Hadoop YARN 任务运行情况查看端口号:8088;
spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ../examples/jars/spark-examples_2.12-3.0.3.jar 

2、spark运行架构

  • Spark框架的核心是一个计算引擎, 整体来说, 它采用了主-从master-slave的结构
  • 下图是Spark执行时的基本结构,
    • Driver表示master: 负责管理整个集群中的作业任务调度
    • Executor是slave: 负责实际执行任务

在这里插入图片描述

  • Spark Apllication的运行架构由两部分组成: Driver program(SparkContext)Excutor, Spark Application一般都是在集群中运行, 比如Spark Standalone, YARN, Mesos, 这个集群模式给Spark Application 提供了计算资源, 并对这些资源进行管理, 这些资源既可以给executor运行, 也可以给Driver Progam运行.
  • 根据Spark Allication的Driver program是否在资源集群中运行, Spark Application的运行方式又可以分为Cluster模式Client模式;
名称 概念
Application 类似于MR中的概念, 指的是用户编写的Spark应用程序, 内含有一个Driver功能的代码和分布在集群中多个节点上运行的Excutor代码
Driver Program 运行Application 的main()函数并且创建SparkContext, 通常用SparkContext代表Driver Program; 其中创建SparkContext的目的是为了准备Spark应用程序的运行环境; 在Spark中由SparkContext负责和ClusterManager通信, 进行资源的申请, 任务的分配和监控; 当Excutor部分运行完毕后, Driver负责将SparkContext关闭.
Excutor 为某个Application运行在Worker节点上的一个进程, Excutor进程负责运行Task, 并且负责将数据存在内存或者磁盘上; 每个Application都有各自独立的excutors
Cluster Manager 指的是在集群上获取资源的外部服务,目前有: Ø Standalone:Spark原生的资源管理,由Master负责资源的分配;Ø Hadoop Yarn:由YARN中的ResourceManager负责资源的分配;
Worker Node 集群中任何可运行Application代码的节点, 在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点
Job 包含多个Task组成的并行计算, 往往由Spark Action催生, 一个Job包含多个RDD及作用在相应RDD上的各种Operation;
Stage 每个Job会被拆分为很多组Task, 每一组任务叫Stage, 或者叫TaskSet
Task 被送到某个Excutor上的工作任务
RDD Spark的基本计算单元, 可以通过一系列算子进行操作(主要有Trasformation和 Action操作)
DAG Scheduler 根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler
Task Scheduler 将Taskset提交给worker(集群)运行并回报结果

2.1、核心组件

Driver && Excutor 计算组件

  • Driver: 运行Spark Application的main()函数, 并创建SparkContext
Driver 功能
1. 将用户程序(Spark Application)转化为作业(job)
2. 在 Excutor 之间调度任务(task)
3. 跟踪Excutor 的执行情况
4. 通过UI 展示查询运行情况
  • Excutor: Worker Node中的一个JVM进程, 用来执行分配给该节点的Task,任务彼此之间相互独立;
    Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。
Excutor核心功能
负责运行组成Spark应用的任务,并将结果返回给驱动器进程
通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

Master && Worker 资源(只存在于StandAlone中)

Spark集群的独立部署环境(StandAlone)中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能,所以环境中还有其他两个核心组件:Master和Worker

  • 这里的Master是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于Yarn环境中的RM,
  • 而Worker呢,也是进程,一个Worker运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于Yarn环境中NM。

ApplicationMaster (AM) 计算-> AM -> 资源

Hadoop用户向YARN集群提交应用程序时,提交程序中应该包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行用户自己的程序任务job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。

  • 说的简单点就是,ResourceManager(资源)和Driver(计算)之间的解耦合靠的就是ApplicationMaster。

2.2、核心概念

Excutor与Core (核)

在这里插入图片描述

并行度(Parallelism)

  • 在分布式计算框架中一般都是多个任务同时进行,由于任务分布在不同的计算结点进行计算, 所有能够真正的实现多任务并行执行, 记住, 这里是并行, 而不是并发.
  • 这里我们将整个集群并行执行任务的数量称之为并行度, 一个作业的并行度是多少取决于框架的默认配置, 应用程序也可以在运行过程中动态修改;

并行: 同一时刻(某个时间点), 一起执行(parallelism)
并发: 某个时间段, 交替快速执行(Concurrence)

有向无环图(DAG)

在这里插入图片描述


在这里插入图片描述

2.3、Spark提交流程

  • 所谓的提交流程, 其实就是开发人员根据需求写的应用程序通过Spark客户端提交给Spark运行环境执行计算的流程
  • Spark运行架构参见下面示意图:
  1. 构建Spark Application的运行环境(启动SparkContext)
  2. SparkContext向资源管理器(可以是Standalone、Mesos、Yarn)申请运行> Executor资源,并启动StandaloneExecutorBackend,executor向 SparkContext申请Task。
  3. SparkContext将应用程序代码发放给executor
  4. SparkContext构建成DAG图、将DAG图分解成Stage、将Taskset发送给Task Scheduler、最后由Task Scheduler将Task发放给Executor运行。
  5. Task在Executor上运行,运行完毕释放所有资源。

在这里插入图片描述

3、spark核心编程

spark计算框架为了能够进行高并发和高吞吐的shu’ju’chshujuch别,封装了三大数据结构,用于处理不同场景。分别为:

  • RDD:弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量:分布式共享只读变量

3.1、RDD

3.1.1、什么是RDD

RDD(Resilient Distributed Dataset)弹性分布式数据集,是spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面元素可并行的计算的集合。(最小计算单元)

RDD特点:

  1. 弹性

    • 存储的弹性:内存与磁盘的自动切换;

    • 容错的弹性:数据丢失可以自动恢复;

    • 计算的弹性:计算出错重试机制;

    • 分片的弹性:可根据需要重新分片;

  2. 分布式

    数据存储在大数据集群不同节点上;

  3. 数据集

    RDD 封装了计算逻辑,并不保存数据;

  4. 数据抽象

  5. RDD 是一个抽象类,需要子类具体实现;

  6. 不可变

    RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的RDD 里面封装计算逻辑;

  7. 可分区、并行计算;

RDD数据处理方式类似于IO流,也有装饰者设计模式

RDD数据只有在调用collect方法时候,擦真正执行业务逻辑操作。之前的封装都是功能的拓展。

RDD是不保存数据的,但是IO可以临时保存一部分数据。

3.1.2、RDD核心属性

  1. 分区列表

    RDD数据结构中存在分区列表,将数据进行分区,分区间的数据是完全相互独立的,互不影响,然后交给不同的Task,目的是实现并行计算,是实现分布式计算的重要属性。

  2. 分区计算函数

    Spark在计算时,是使用分区计算函数对每一个分区进行计算。但是,每一个分区计算函数的计算逻辑都是一样的,是由RDD事先封装好,传递到Executor的!

  3. RDD之间的依赖关系

    • RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系。从方法中可以看出一个RDD的依赖是一个列表。
    • 所谓的RDD依赖关系就是包装,想要获取最外层的RDD,就要逐层向内,从最原始的RDD开始构建。
    • RDD不是单依赖,意味着可以多个RDD合成一个RDD
  4. 分区器

    • 分区器制定分区规则
    • 当数据为KV类型数据时,可以通过设定分区器自定义数据的分区
  5. 首选位置

    用于解决将task交给哪个Executor进行运算的问题。

    • 当数据和Executor在一个节点的时候,Task首选分配给当前节点的Executor,这样避免了数据的网络传输,效率最优。
    • 核心概念:移动数据不如移动计算 ;在有数据的节点上创建Executor进行计算

3.1.3、执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合

Spark框架在执行时,先申请资源;然后将应用程序的数据处理逻辑分解成一个一个的计算任务;然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。

RDD是Spark框架中用于数据处理的核心模型,接下来我们看看,在Yarn环境中,RDD的工作原理

  1. 启动Yarn集群环境,此时有了资源

    在这里插入图片描述

  2. Spark通过申请资源创建调度节点和计算节点,也就是Driver和Executor,二者都是运行在NodeManager上

    在这里插入图片描述

  3. Spark框架根据需求将计算逻辑根据分区划分成不同的Task,并将Task放入TaskPool,放进任务池是为了等待被调度

    在这里插入图片描述

  4. 调度节点(Driver)将任务根据计算节点Executor的状态 和 RDD首选位置的配置将Task发送到对应的计算节点进行计算

    在这里插入图片描述

从以上流程可以看出RDD在整个流程中主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算,接下来我们就一起看看Spark框架中RDD是具体是如何进行数据处理的。

3.1.4、RDD的创建

1)从集合(内存)中创建RDD

def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("create_memory")
        .set("spark.testing.memory", (512 * 1014 * 1024) + "")
    val sc=new SparkContext(sparkConf)
    val seq = Seq(4,2,24,5.5)
    val rdd1 = sc.parallelize(
        seq
    )
    val rdd2 = sc.makeRDD(
        List(1, 2, 3, 4)
    )
    rdd1.collect().foreach(println)
    rdd2.collect().foreach(println)
    sc.stop()
}
  • local[*]表示用机器的CPU核数模拟多线程执行计算任务,local表示用单核
  • 并行和并发:当电脑有两个CPU核的时候,并行度就是2,如果有4个task,那么并行度也只能是2,剩下的就是并发执行了
  • 从底层代码实现来讲,makeRDD方法其实就是parallelize方法。

2)从文件中创建RDD

def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("create_memory")
    .set("spark.testing.memory", (512 * 1014 * 1024) + "")
    val sc=new SparkContext(sparkConf)
    //path可绝对路劲也可相对路径,也可为目录
    val line:RDD[String] = sc.textFile("studyExample/data/1.txt")
    line.collect().foreach(println)
    sc.stop()
}

由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、HBase等。

  • textFile()中的路径默认以当前环境的根路径为基准,可以写相对路径也可以写绝对路径,而且也可以写目录路径,读取目录下的多个文件
  • Path也可以写通配符 textFile(“datas/1*.txt”)
  • Path也可以是分布式存储系统路径:HDFS: hdfs://hadoop102:8020/text.txt
def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("create_memory")
    .set("spark.testing.memory", (512 * 1014 * 1024) + "")
    val sc=new SparkContext(sparkConf)
    //path可绝对路劲也可相对路径,也可为目录
    val line= sc.wholeTextFiles("studyExample/data")
    line.collect().foreach(println)
    sc.stop()
}
  • wholeTextFile(path)方法:path和textFile方法一样。
  • textFile()以行为单位来读取数据,wholeTextFile以文件为单位读取数据,读取到的都是字符串
  • 读取的结果是一个tuple,第一个元素为文件路径,第二个元素为文件内容

RDD的并行度

(1)RDD并行度

  • 默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度
  • 这个数量可以在构建RDD时指定。记住,并行度 = 并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
  • 三个分区,对应三个Task,但是只有一个Executor,单核,那么无法并行执行,只能并发执行,所以分区和并行度不能划等号。

(2)makeRDD创建RDD分区的数量

  • makeRDD方法第二个参数表示分区的个数,不传递参数那么会使用默认参数defaultParallism,Spark会从配置对象SparkConf中获取配置参数:spark.default.Parallism。如果获取不到,那么使用totalCores属性,此属性取值为当前Spark运行环境的最大可用核数。这里是local,所以是本地机器的CPU个数。
    1.默认:Spark运行环境的core个数
    2.手动指定:在SparkConf对象中配置核数:sparkConf.set(“spark.defaul.parallelism”,”5”)
val dataRDD: RDD[Int] =sc.makeRDD(List(1,2,3,4),4)
val fileRDD: RDD[String] =sc.textFile("data",2)

rdd.saveTextFile("output")可将处理的数据保存为分区文件

从内存中创建RDD分区数据的分配

从内存中创建RDD,数据是如何分配到各个分区中的?
数据分区规则的Spark核心源码如下:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
  (0 until numSlices).iterator.map { i =>
    val start = ((i * length) / numSlices).toInt
    val end = (((i + 1) * length) / numSlices).toInt
    (start, end)
  }
}
Length=5,nums=3, i = 0 1 2
i=0=>[0,1) =>1
i=1=>[1,3) =>2,3
i=2=>[3,5) => 4,5

从文件中创建RDD分区个数的计算规则

从文件创建RDD使用的是Hadoop中的TextFileInputStream,切片规则是一样的。

  1. 默认分区数计量: math.min(defaultParallelism,2)
    即运行环境的CPU核数和2的最小值
  2. 指定分区数量minParitions
    1. totalSize: 目录下所有文件或者文件的总共字节数
    2. goalSize: 每个分区应该存储的字节数。totalSize除以设定的分区数
    3. splitSize : Math.max(minSize, Math.min(goalSize, blockSize));
    4. 对文件按照splitSize进行切片,当剩余的文件字节数<1.1* splitSize,就不切了。

从文件中创建RDD分区数据的分配规则

  1. 分区是按行获取文件中的数据

    采用Hadoop的方式读取数据,一行一行读取,和字节数没有关系

  2. 每个分区获取哪些行呢?

    根据偏移量的范围确定获取哪些行。第一个字节的偏移量为0,一个字节对应1个单位的偏移量。

    • 0号分区偏移量范围:[0,0+goalSize],0号分区获取偏移量0和偏移量goalSize所在的行的全部数据。

    • 1号分区偏移量范围:[goalSize,goalSize+goalSize],1号分区获取偏移量goalSize所在行的下一行到2*goalSize所在行的全部数据。

      在这里插入图片描述

  3. 如果数据源是多个文件,那么是以文件为单位进行分区的。(以文件为单位切片)

3.1.5、RDD算子

RDD的方法称之为算子。

认知心理学认为解决问题其实就是将问题的状态进行改变:
问题(初始)=>操作(算子)=>问题(中间状态1)=>操作(算子)=>……=>问题(解决)

算子分为两类:

  • 转换算子:功能的补充和封装,将旧的RDD包装成新的RDD(eg:map,flatMap)

    转换算子示意图:

    在这里插入图片描述

  • 行动算子:触发任务的调度和执行(collect)

3.1.6、转换算子

RDD根据数据处理方式的不同将算子整体上分为:

  • Value类型:每次处理的元素是1个元素,元素类型为单值
  • 双Value类型:每次处理是两个元素
  • Key-Value类型: 每次处理的是一个元素,元素类型为k-v
value类型
  1. map

    • 函数签名:

      def map[U: ClassTag](f: T => U): RDD[U]
      
    • 函数说明:

      • 将处理的数据逐条进行映射转换;
      • 这里的转换可以是类型的转换,也可以是值的转换
      • 由于只是做转换,因此处理的数据集中数据不会增加也不会减少
    • 演示:

      val dataRDD: RDD[Int] = sct.makeRDD(List(1,2,3,4))
      val dataRDD1: RDD[Int] = dataRDD.map(
          num => {num * 2}
      )
      
    • 特点:

      1) 设置并行度为1 setMaster(“local[1]”)

      • 由于只有一个并行度,因此分区内的元素[1,2,3,4]是顺序执行的;=> 分区内有序执行
      • 算子1和算子2的计算结果是紧挨着的,这意味着分区内是以元素为计算单位的,只有一个元素所有的算子都计算完毕,才会计算下一个元素

      2)将并行度设置为2

      • 分区内有序: 一个分区内的数据是一个一个执行的,只有前面一个数据全部逻辑执行完毕,才会执行下一个数据
      • 分区间无序:分区1和分区2二者并行执行,没有严格的先后顺序

    每个元素挨个计算,从头算到尾再算下一个元素,效率低下

  2. mapPartitions

    • 函数签名:

      def mapPartitions[U: ClassTag](
          f: Iterator[T] => Iterator[U],
          preservesPartitioning: Boolean = false): RDD[U]
      
    • 函数说明

      将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

      • 参数: f: Iterator[T] => Iterator[U] 参数是迭代器,返回的必须也是迭代器

      • 执行特点: 以分区为单位进行数据转换操作,但是会将整个分区的数据加载到内存中进行引用,分区中处理完的数据不会被释放掉,因为存在对象的引用,只有当分区数据全部处理完,分区中的对象才会被释放掉。

      • 效率比map高,但是当分区内数据量大容易内存溢出。

      • 和map不同点:map处理元素是1进1出,RDD中元素的个数不会变多也不会变少,MapPartitions不会,如下所示,过滤也可

        val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
            datas => {datas.filter(_==2)}
        )
        
    • 演示:

      val sc = new SparkContext(conf)
      val src = sc.makeRDD(List(1, 2, 3, 4),2)
      
      val value = src.mapPartitions(
          iter => {
              println(">>>>>>>>>")
              iter.map(_ * 2)
          }
      )
      value.collect().foreach(x=>println(x))
      
    • 与map对比

      • 数据处理角度
        Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。
      • 功能的角度
        Map算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
      • 性能的角度
        Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可
  3. mapPartitionsWithIndex

    • 函数签名:

      def mapPartitionsWithIndex[U: ClassTag](
        f: (Int, Iterator[T]) => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U]
      
    • 函数说明:

      将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,再处理时可以获取当前分区索引。

    • 演示:获取第二个数据分区的数据

      val sc = new SparkContext(conf)
      val src = sc.makeRDD(List(1, 2, 3, 4),2)
      val value = src.mapPartitionsWithIndex((index, iter) => {
          if(index == 1 ) {
      	    iter
          }else{
          	Nil.iterator
          }
      })
      value.collect().foreach(x=>println(x))
      
  4. flatMap

    • 函数签名:

      def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
      
    • 函数说明:

      • 在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中
      • flatMap又叫做扁平化,和聚合是相反的功能;聚合是将多个元素聚合成一个集合,扁平化是将一个集合拆解成多个元素;
      • 这个函数的特点是将每个元素变成一个可以迭代的对象(集合),所以对于单个元素的处理,需要将其封装成集合即可
    • 演示

      val src = sc.makeRDD(List(List(1,2),List(3,4)),1)
      val value = src.flatMap(
      	list => list
      )
      value.collect().foreach(x=>println(x))
      
  5. glom

    • 函数签名:

      def glom(): RDD[Array[T]]
      
    • 函数说明:
      将同一个分区的数据直接转换为相同类型的内存数组,以数组为单位处理每个元素,分区不变

    • 演示:

      val src = sc.makeRDD(List(1, 2, 3, 4,5,6),2)
      val glomRDD = src.glom()
      val maxRDD= glomRDD.map(
          arr => {
              arr.max
          }
      )
      println(maxRDD.collect().sum)
      
    • 分区不变的理解

      分区就是将RDD中的数据分成多个相互独立的数据,交给多个Task处理,一个分区对应一个Task,分区不变意思就是RDD1每个分区中的数据经过转换算子后变成RDD2,RDD2每个分区的数据来自RDD1对应的分区

      在这里插入图片描述

  6. groupBy

    • 函数签名:

      def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
      
    • 函数说明:

      • 将元素通过函数获取key;
      • 此算子返回的RDD的数据类型为(K,V)类型,K就是分组的key,value就是该key对应的元素组成的迭代器;
    • 演示:

      val src = sc.makeRDD(List(1, 2, 3, 4,5,6),2)
      val dataRDD = src.groupBy(_ % 2)
      dataRDD.foreach(x=>println(x))
      
    • shuffle过程

      在这里插入图片描述


      如图所示,RDD中数据在RDD之间跨分区了,这就会产生shuffle;对于groupBy分组操作来说,原本不同分区的数据要进入一个分组,一个分组的数据只能在一个分区内,因此必然会产生shuffle

      分组和分区的关系:

      • 分组和分区没有必然联系
      • 分区个数决定了当前RDD将Task划分成多少个subTask,意味着输出的结果的个数
      • 分组只是将RDD中的数据进行分类
      • 一个分区中可以有多个分组的数据
  7. filter

    • 函数签名:

      def filter(f: T => Boolean): RDD[T]
      
    • 函数说明

      1. 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
      2. 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜
    • 演示:

      val src = sc.makeRDD(List(1, 2, 3, 4,5,6),1)
      val dataRDD1 = src.filter(_%2 == 0)
      dataRDD1.collect().foreach(println)
      
  8. sample

    • 函数签名:

      def sample(
      	withReplacement:Boolean,//放不放回去
          fraction:Double,//比率
          seed:Long=Utils.random.nextLong//随机种子
      ):RDD[T]
      
    • 函数说明

      根据指定规则从数据集中抽取数据

    • 演示:

      val src = sc.makeRDD(List(1, 2, 3,4,5,6,7,8,9,10),1)
      val dataRDD1 = src.sample(false,0.4,1)
      dataRDD1.collect().foreach(println)
      
  9. distinct

    • 函数签名:

      def distinct()(implicit ord: Ordering[T] = null): RDD[T]
      def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
      
    • 函数说明:

      将数据集中的元素去重

    • 演示

      val src = sc.makeRDD(List(1, 2,2,3,4,4,5,5,6,7,8),1)
      val dataRDD1 = src.distinct()
      dataRDD1.collect().foreach(println)
      
    • 函数原理:

      • Scala中集合去重原理

        在这里插入图片描述

      • Spark去重

        在这里插入图片描述

  10. coalesce

  • 函数签名:

    def coalesce(numPartition:Int,shuffle:Boolean=false,
    partitionCoalescer:Option[partitionCoalescer]=Option.empty)
    (implicit ord:Ordering[T]=null):RDD[T]
    
  • 函数说明:

    缩减分区,一个分区对应一个Task

  • 应用场景

    比如在用了filter算子之后,每个分区的数据量少了很多,这样一来每个Executor计算的数据量很小,但是Executor的调度也占用资源;所以可以用一个Executor来计算

  • 演示:

    • 参数1: 指定分区个数,分区并行计算,一个分区一个Task

      val src = sc.makeRDD(List(1, 2,2,3,4,4,5,5,6,7,8),3)
      val dataRDD1 = src.coalesce(2)
      //        dataRDD1.collect().foreach(println)
      dataRDD1.glom().foreach(x=>println(x.mkString(",")))
      
    • 参数2: 是否开启shuffle,默认为false

      val src = sc.makeRDD(List(1, 2,2,3,4,4,5,5,6,7,8),3)
      val dataRDD1 = src.coalesce(2,true)
      dataRDD1.glom().foreach(x=>println(x.mkString(",")))
      
  1. repartition

    • 函数签名:

      def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
      
    • 函数说明

      • 该操作内部其实执行的是coalesce操作,并且参数shuffle的强制为true
      • 开启了shuffle,意味着repartition操作可以完成缩小分区和扩大分区
      • 想要实现扩大分区的效果,必须走shuffle
    • 演示:

      val src = sc.makeRDD(List(1, 2,2,3,4,4,5,5,6,7,8),3)
      val dataRDD1 = src.repartition(2)
      dataRDD1.glom().foreach(x=>println(x.mkString(",")))
      
    • coalesce和repartition总结

      • coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
      • coalesce默认不开启shuffle,一般为缩减分区使用,如果扩大分区,也不会增加分区总数,意义不大。并且缩减分区,导致数据倾斜;
      • repartition实际上是调用的coalesce,进行shuffle,可以达到扩大分区的效果。
  2. sortBy

    • 函数签名:

      def sortBy[K](
        f: (T) => K,
        ascending: Boolean = true,
        numPartitions: Int = this.partitions.length)    
        (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
      
      
    • 函数说明

      • 该算子对RDD数据进行全局排序,然后再分到各个分区
      • 在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。
      • 排序后新产生的RDD的分区数默认与原RDD的分区数一致。
      • 中间存在shuffle的过程,可以修改分区数;
    • 演示:

      val src = sc.makeRDD(List(10, 6,2,7,4,5,4,5,6,7,8),1)
      val dataRDD1 = src.sortBy(num => num, false, 1)//排序规则,升序,分区数目
      dataRDD1.glom().foreach(x=>println(x.mkString(",")))
      
双Value类型
  1. intersection

    • 函数签名

      def intersection(other: RDD[T]): RDD[T]
      
    • 函数说明

      对源RDD和参数RDD求交集后返回一个新的RDD
      经过shuffle

    • 演示

      val dataRDD1 = sc.makeRDD(List(1,2,3,4,5,6,7))
      val dataRDD2 = sc.makeRDD(List(3,4,5,6))
      val dataRDD = dataRDD1.intersection(dataRDD2)
      dataRDD.glom().foreach(arr=>println(arr.mkString(",")))
      
  2. union

    • 函数签名

      def union(other: RDD[T]): RDD[T]
      

      不经过shuffle

    • 函数说明

      对源RDD和参数RDD求并集后返回一个新的RDD

    • 演示

      val dataRDD1 = sc.makeRDD(List(1,2,3,4),2)
      val dataRDD2 = sc.makeRDD(List(13,14,15,16),2)
      val dataRDD = dataRDD1.union(dataRDD2)
      dataRDD.glom().foreach(arr=>println(arr.mkString(",")))
      
  3. substract

    • 函数签名

      def substract(other: RDD[T]): RDD[T]
      
    • 函数说明

      差集,左边的rdd减去右边的rdd

    • 演示

      val dataRDD1 = sc.makeRDD(List(1,2,3,4))
      val dataRDD2 = sc.makeRDD(List(3,4,5,6))
      val dataRDD = dataRDD1.subtract(dataRDD2)
      dataRDD.glom().foreach(arr=>println(arr.mkString(",")))
      
    • 交并差总结

      两个RDD的数据类型必须一致

  4. zip

    • 函数签名

       def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
      
    • 函数说明

      1. 将两个RDD中的元素,按照位置拉在一起,形成tuple
      2. 其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。
      3. 没有shuffle
      4. 两个RDD必须分区数一致并且每个分区内元素个数必须一致
      5. 两个RDD数据类型不一致没问题
      6. 两个RDD数据分区不一致(Error)
      7. 两个RDD分区数据数量不一致(Error)
    • 演示

      val dataRDD1 = sc.makeRDD(List(1,2,3,4))
      val dataRDD2 = sc.makeRDD(List(3,4,5,6))
      val dataRDD = dataRDD1.zip(dataRDD2)
      dataRDD.glom().foreach(arr=>println(arr.mkString(",")))
      
Key-Value类型
  • kv型算子指的就是只能作用于元素类型为KV(tuple)的RDD
  • 这些算子不属于RDD类,而是PairRDDFunctions[K, V]类提供
  1. partitionBy

    • 函数签名

      def partitionBy(partitioner: Partitioner): RDD[(K, V)]
      
    • 函数说明

      • 将RDD[K,V]中的K按照指定Partitioner重新进行分区。

      • 如果原有的RDD和新的RDD的Partitioner是一致的话就不进行分区,否则会产生Shuffle过程。

      • Spark默认的分区器是HashPartitioner

      注意: coalesce和repartition算子是改变RDD的分区数,这里是改变RDD的分区规则

    • 演示

      val rdd : RDD[(Int, String)] = sc.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
      val rdd2 = rdd.partitionBy(new HashPartitioner(2))
      
    1. HashPartitioner

      实现原理:就是根据key的hash值,和分区数取模运算

    2. PartitionBy指定分区的问题

      1.如果重分区的分区器和当前RDD的分区器一样怎么办?

      答:在pairRDDFunctions类中,partitionBy方法会判断传入的分区器和当前RDD的分区器是否一样,如果一样,就返回RDD本身,也就是不做任何操作

      2.Spark还有其他分区器吗?

      在Partitioner类中,Ctrl+H可以看实现类:[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oQleP8u3-1661340490861)(C:\Users\高晓如\AppData\Roaming\Typora\typora-user-images\image-20220704110252086.png)]

      RangePartitioner用于排序场景;SortBy算子用到了这个分区器

    3. 自定义分区器

      要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法。

      1. numPartitions: Int:返回创建出来的分区数。

      2. getPartition(key: Any):Int 返回给定键的分区编号(0到numPartitions-1)。

      3. equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同

  2. reduceByKey

    • 函数签名:

      def reduceByKey(func: (V, V) => V): RDD[(K, V)]
      def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
      
    • 函数说明

      • 该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合,对V是两两聚合
      • 可以设置新RDD的分区数。
    • 演示

      val dataRDD1 = sct.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
      val rdd1 : RDD[(String, Int)] = dataRDD1.reduceByKey(_ + _)
      val rdd2 = dataRDD1.reduceByKey(_+_, 2)
      rdd1.glom().foreach(x=>println(x.mkString(","))) //(a,5),(b,2),(c,3)
      rdd2.glom().foreach(x=>println(x.mkString(",")))
      
  3. groupByKey

    • 函数签名:

      def groupByKey(): RDD[(K, Iterable[V])]
      def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
      def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
      
    • 函数说明

      • groupByKey对每个key进行操作,但只生成一个seq,并不进行预聚合。
      • 该操作可以指定分区器或者分区数(默认使用HashPartitioner)
    • 演示

      //todo 1.groupBykey 和 groupBy的区别
      val rdd : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a", 2)))
      val rdd1 : RDD[(String, Iterable[Int])] = rdd.groupByKey()
      val rdd2: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
      //todo 2.groupBykey 也可以指定分区器
      val rdd3: RDD[(String, Iterable[Int])] = rdd.groupByKey(new HashPartitioner(2))
      rdd1.glom().foreach(x=>println(x.mkString(","))) 
      rdd2.glom().foreach(x=>println(x.mkString(","))) 
      

      注意两点:

      1. groupBykey 和 groupBy的区别
        groupByKey: tuple类型,(key,value组成的迭代器)
        groupBy: tuple类型,(key,(key,value)组成的迭代器)
      2. groupBykey 也可以指定分区器
    • groupByKey和reduceByKey的区别

      1.功能上

      1. reduceByKey以相同key为一组做聚合
      2. groupByKey 只做分组不聚合
      3. 二者都有shuffle

      2.shuffle的影响

      在这里插入图片描述


      上图的例子中,假设场景是先分组,然后对每个分组做聚合;

      • 由于groupByKey涉及shuffle,一个key的所有数据需要从每个分区获取,才能获取一个分组的完整数据,因此在做聚合运算之前,必须等待每个分区都groupByKey执行结束;
      • 也就是说如果没有shuffle,分区之间各算各的互不影响,一旦有shuffle,分区必须等待,所有分区执行完才能进入下一步操作
      • 由于shuffle要等待所有分区执行完毕,因此分区A shuffle完毕,需要等待别的分区shuffle完,在等待过程中,内存中数据会越来越多,因此shuffle操作必须要落盘,如上图所示,一个分区的shuffle数据会写入磁盘中
      • shuffle会落盘,那么就有磁盘IO,所以性能低下

      3.reduceByKey预聚合

      在这里插入图片描述

      • reduceByKey在执行shuffle前,会先在分区内对分组数据做聚合,因此在shuffle落盘的时候,数据量会大大减少;这个原理和MapReduce中的Combiner是一样的
      • 在shuffle完毕后,分区间的数据集结完毕,再做分区间的数据聚合

      4.总结

      • 从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,因此reduceByKey性能比较高。
    • 从功能的角度:reduceByKey其实包含分组和聚合的功能。groupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey

  4. aggregateByKey

    • 函数签名:

      def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
        combOp: (U, U) => U): RDD[(K, U)]
      
    • 函数说明

      1. 将数据根据不同的规则进行分区内计算和分区间计算

        • 分区内计算:在shuffle之前完成分组部分数据的计算
        • 分区间计算:在shuffle之后完成分组完整数据的计算
      2. zeroValue的类型决定了最终聚合完后value的类型

      3. zeroValue只参与分区内计算,当初始值;

      4. 也是两两运算

    • 演示:

      val src =sc.makeRDD(List(("a",1),("b",2),("a",2),("c",3),("b",2),("c",3)))
      val rdd1 =src.aggregateByKey("x")(_+_,_+_)
      rdd1.glom().foreach(x=>println(x.mkString(",")))
      
  5. foldByKey

    • 函数签名:

      def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
      
    • 函数说明:

      当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey

    • 演示:

      val src=sc.makeRDD(List(("a",1),("b",2),("a",2),("c",3),("b",2),("c",3)))
      val rdd1 =  src.foldByKey(0)(_+_)
      rdd1.glom().foreach(x=>println(x.mkString(",")))
      
  6. combineByKey

    • 函数签名:

      def combineByKey[C](
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C): RDD[(K, C)]
      
      • createCombiner, which turns a V into a C (e.g., creates a one-element list)
        这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
      • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
        该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
      • mergeCombiners, to combine two C’s into a single one.
        该函数把2个元素C合并 (这个操作在不同分区间进行)
    • 函数说明

      • 最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。
      • 类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致
      • 和aggregateByKey的区别是,无法自定义zeroValue,而是将分组内第一元素做map映射,转换类型,最终聚合结果的value和这个类型保持一致
    • 演示:

      val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
      val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
      
      val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
          (_, 1),  //对同key的第一个数据进行map转换
          (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
          (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)  //分区间运算
      )
      

      注意:由于在运行时对第一个元素做了类型转换,因此在做分区内和分区间运算的时候,将类型给写上;

    • 上述四个方法底层调用的是同一个方法;

  7. sortByKey&sortByValue

    • 函数签名:

      def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
        : RDD[(K, V)]
      
    • 函数说明:

      • 有shuffle
      • 根据key来排序的
      • 在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的
    • 演示:

      val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4),("a",5)),4)
      val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true,2)
      val sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)
      
      val tuples: Array[(String, Int)] = sortRDD1.collect()
      tuples.foreach(println)
      
      val value: RDD[(Int, (String, Int))] = sortRDD1.mapPartitionsWithIndex((index, iter) => {
          iter.map(x => {
              (index, x)
          })
      })
      value.collect().foreach(println)
      
双Key-Value类型
  1. join

    • 函数签名:

      def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
      
    • 函数说明:

      在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD

    • 演示:

      val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"),(2,"d")))
      val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6),(4,5),(2,6)))
      val rdd2 = rdd.join(rdd1, 2)
      
      val result: RDD[(String, (Int, (String, Int)))] = rdd2.mapPartitionsWithIndex((index, iter) => {
      iter.map(("分区号" + index, _))
      })
      result.collect().foreach(println)
      
    • 特点

      • 笛卡尔积
        RDD1中,key的value有n个,RDD2中,同一个key的value有m个,则会输出n*m个join结果, join尽量不要用
      • 输出结果类型
        (key1,(value1,value2))
      • inner join
        Join不保留两个RDD中key不匹配的key
      • 必然走shuffle
  2. leftOutJoin

    • 函数签名:

      def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
      
    • 函数说明:

      类似于SQL语句的左外连接

    • 演示:

      val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"),(2,"d"),(5,"e")))
      val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6),(4,5),(2,6)))
      rdd.leftOuterJoin(rdd1).collect().foreach(println)
      
    • 特点

      1. 输出结果类型
        (key1,(value1,value2))
      2. left join
        保留左边RDD的结果,没有匹配上为: (key1,(value1,None));
        不保留右边没匹配上的数据
      3. 必然走shuffle
    1. cogroup

      • 函数签名:

        def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
        
      • 函数说明

        • 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
        • 好处就是不会产生笛卡尔积,而是将每个RDD中相同key的value先封装到一个集合中,再进行outJoin,如此一来每一个key只有一个结果
      • 演示

        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"),(2,"d"),(5,"e")))
        val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6),(4,5),(2,6)))
        
        val value: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd.cogroup(rdd1)
        value.collect().foreach(println)
        
      • 特点

        Cogroup可能存在shuffle,当数据源RDD的分区器不同,就会shuffle

3.1.7、行动算子

  • 行动算子触发任务执行,sc.runjob(),创建activeJob,提交并执行。
  • 行动算子的返回值是一个结果,而不是RDD
  1. reduce

    • 函数签名:

      def reduce(f: (T, T) => T): T
      
    • 函数说明:

      聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

    • 演示:

      val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
      // 聚合数据
      val reduceResult: Int = rdd.reduce(_+_)
      println(reduceResult) //10
      
  2. collect

    • 函数签名:

      def collect(): Array[T]
      
    • 函数说明:

      • 以数组Array的形式收集RDD的所有元素到驱动程序(Driver)中
      • 以分区为单位,从0号分区将数据按顺序收集采集到Driver
    • 演示:

      val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
      // 收集数据到Driver
      rdd.collect().foreach(println)
      
  3. count

    • 函数签名:

      def count(): Long
      
    • 函数说明:

      返回RDD中元素的个数

    • 演示:

      val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
      // 返回RDD中元素的个数
      val countResult: Long = rdd.count()
      
  4. first

    • 函数签名:

      def first(): T
      
    • 函数说明:

      返回RDD中的第一个元素

    • 演示:

      val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
      // 返回RDD中元素的个数
      val firstResult: Int = rdd.first()
      println(firstResult)
      
  5. take

    • 函数签名:

      def take(num: Int): Array[T]
      
    • 函数说明:

      返回一个由RDD的前n个元素组成的数组

    • 演示:

      vval rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
      // 返回RDD中元素的个数
      val takeResult: Array[Int] = rdd.take(2)
      println(takeResult.mkString(","))
      
  6. takeOrdered

    • 函数签名:

      def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
      
    • 函数说明:

      返回该RDD排序后的前n个元素组成的数组

    • 演示:

      val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
      // 返回RDD中元素的个数
      val result: Array[Int] = rdd.takeOrdered(2) //1 2
      
  7. aggregate

    • 函数签名:

      def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
      
    • 函数说明:

      • 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
      • 哪个分区先计算完谁在前面
      • 初始值只参与1次分区间计算,多个分区只参与一次
      • 聚合结果的数据类型和zeroValue的类型是一样的
    • 演示:

      val list1 = List(1,2,3,4,5,6,7,8,9,10)
      val rdd1: RDD[Int] = sc.makeRDD(list1,4)
      val value = rdd1.mapPartitionsWithIndex((index, iter) => {
          iter.map(x => {
              (index, x)
          })
      })
      value.collect().foreach(println)
      
      val str: String = rdd1.aggregate("+")(_ + _, _ + _)
      println(str)
      //++345+12+8910+67
      
      //todo aggregate
      // zeroValue : 参与一次分组内计算,做初始值
      //             参与一次分组间计算,做初始值
      //todo aggregateByKey 是行动算子
      // zeroValue: 只参与一次分组内计算,做初始值
      
  8. fold

    • 函数签名:

      def fold(zeroValue: T)(op: (T, T) => T): T
      
    • 函数说明:

      aggregate的简化版操作

    • 演示:

      val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
      val foldResult: Int = rdd.fold(0)(_+_)
      
  9. countByKey&countByValue

    • 函数签名:

      def countByKey(): Map[K, Long]
      
    • 函数说明:

      以键或值计数

    • 演示:

      val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",3),("c",3)))
      dataRDD1.countByKey().foreach(println)
      dataRDD1.countByValue().foreach(println)
      
  10. save

    • 函数签名:

      def saveAsTextFile(path: String): Unit
      def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
      
    • 函数说明:

      saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。

    • 演示:

      val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",3),("c",3)))
      // 保存成Text文件
      rdd.saveAsTextFile("output")
      // 序列化成对象保存到文件
      rdd.saveAsObjectFile("output1")
      // 保存成Sequencefile文件 只能用于KV类型RDD
      //  rdd.map((_,1)).saveAsSequenceFile("output2")
      
  11. foreach

    • 函数签名:

      def foreach(f: T => Unit): Unit = withScope {
          val cleanF = sc.clean(f)
          sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
      }
      
    • 函数说明:

      分布式遍历RDD中的每一个元素,调用指定函数

    • 演示:

      val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
      // 1.收集后打印
      rdd.map(num=>num).collect().foreach(println)
      println("****************")
      // 2.分布式打印
      rdd.foreach(println)
      

      collect先采集到Driver,这个foreach是内存中的集合的遍历方法

      foreach将数据发给不同的executor端,进行打印,是分布式打印
      从上面的打印结果可以看出来,收集后,由于是按照分区序号收集的,因此是按顺序打印的;而分布式打印,是并行的,哪个分区在前是不一定的

      • collect后打印:

        在这里插入图片描述

      • 直接调用foreach算子打印:

        在这里插入图片描述

    • 算子的本质

      • 算子是RDD的方法,和Scala集合对象的方法不同
      • 集合对象的方法都是在同一个节点的内存中完成的
      • RDD的算子可以将计算逻辑发送到Executor端(分布式节点)内存执行
      • 为了区分不同的处理效果,所以将RDD的方法称之为算子
      • RDD的算子外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor中执行的。

3.1.8、序列化

闭包检测

  1. RDD算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。
  2. 当算子内用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,必须检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。

问题引入

1)下面是一段Spark程序:

object _01_闭包检测 {
    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]") setAppName ("CreateRDD")
        val sparkContext: SparkContext = new SparkContext(sparkConf)

        val rdd: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
        val user  = new User()
        //TODO User对象是在Driver端创建的,RDD算子中用到了
        // user的属性,所以需要在网络传输,就必须进行序列化
        rdd.foreach(
            num=>{
                println("age"+(user.age+num))
            }
        )
    }
    class User{
        var age : Int = 30
    }
}

执行结果报错:

org.apache.spark.SparkException: Task not serializable

原因:

在这里插入图片描述


在Driver端创建的User对象,在RDD算子内使用了,因此需要将此对象发送到Executor端;但是User没有实现序列化

2)解决方法 : 实现序列化接口

在主程序中,算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行;算子中用到了User类的age属性,这意味着这个User对象必须从Driver端发送到Executor端,因此User必须实现序列化

//方式1:直接混入Serializable特质
class User extends Serializable {
  var age : Int = 30
}
//方式2:通过用样例类,样例类自动混入序列化特质
case class User(){
  var age : Int = 30
}

闭包检测的时机

object _01_闭包检测发生时机 {
    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]") setAppName ("CreateRDD")
        val sparkContext: SparkContext = new SparkContext(sparkConf)

        //TODO 用一个数据为空的RDD来检测闭包检测发生时机
        val rdd: RDD[Int] = sparkContext.makeRDD(List())
        val user  = new User()
        rdd.foreach(
            num=>{
                println("age"+(user.age+num))
            }
        )
    }
    case class User(){
        var age : Int = 30
    }
}

  • 空的RDD无法遍历,因此不会执行foreach算子中的内容,但是会发现仍然报错;这意味着不是在执行foreach算子内容的时候进行的闭包检测,而是在执行前进行闭包检测;
  • 当函数用到了函数外部数据,就会执行闭包操作,闭包会在函数内部引入外部变量,改变此变量的生命周期,不需要真正的执行这个函数,只需要在闭包的时候检测传进来的变量是不是可序列化的就行了。
  • 所以闭包检测发生在函数执行前,闭包的时候。

序列化实际执行时的问题

object serializable{
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
        //一个Search对象
        val search = new Search("hello")
        //todo 函数传递,打印:ERROR Task not serializable
        search.getMatch1(rdd).collect().foreach(println)
        //todo  属性传递,打印:ERROR Task not serializable
        search.getMatch2(rdd).collect().foreach(println)
        search.getMatch3(rdd).collect().foreach(println)
        sc.stop()
    }
}
//TODO 查询对象
class Search(query:String) extends Serializable {
    //todo  1.用外置函数查询rdd中的元素有没有包含query
    def isMatch(s: String): Boolean = {
        s.contains(query)  // todo  这个query = this.query 是Search类的属性
    }
    def getMatch1 (rdd: RDD[String]): RDD[String] = {
        //rdd.filter(this.isMatch)
        rdd.filter(isMatch)
    }
    //todo getMatch1方法中调用了rdd.filter(isMatch)
    // filter算子中的isMatch方法只用到了query 字符串类型,为什么也不能通过
    // 原因:类的构造参数是类的属性,query就是Search类的属性
    // 反编译就能看到了  在java中是  private final String query
    // 构造参数需要进行闭包尖刺,其实就等同于类进行闭包检测
    //todo 2.用匿名函数方式查询
    def getMatch2(rdd: RDD[String]): RDD[String] = {
        rdd.filter(x => x.contains(query))
        //val q = query
        //rdd.filter(x => x.contains(q))
    }
    def getMatch3(rdd: RDD[String]): RDD[String] = {
        //todo 3.getMatch2的改进
        // 调用getMatch3的时候Search不用序列化也能通过
        val q = query
        rdd.filter(x => x.contains(q))
        // val q = query 在driver端执行
        //filter算子中的逻辑执行在Executor端
        //filter算子闭包引用的是q,q是方法的局部变量,和类没有关系
        //形成闭包,q是字符串类型,可序列化
    }
}

Kryo序列化框架

  1. Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,数据在网络中传输比较慢。
  2. Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。
  3. 当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。其他类型还不能用Kryo序列化
  4. 注意:即使使用Kryo序列化,也要继承Serializable接口。
  5. kryo序列化仍然会序列化java中transient修饰的字段

Kryo序列化的使用方式:

object serializable_Kryo {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        .setAppName("SerDemo")
        .setMaster("local[*]")
        // 替换默认的序列化机制
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        // 注册需要使用 kryo 序列化的自定义类
        .registerKryoClasses(Array(classOf[Searcher]))
        val sc = new SparkContext(conf)
        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)
        val searcher = new Searcher("hello")
        val result: RDD[String] = searcher.getMatchedRDD1(rdd)
        result.collect.foreach(println)
    }
}
case class Searcher(val query: String) {
    def isMatch(s: String) = {
        s.contains(query)
    }
    def getMatchedRDD1(rdd: RDD[String]) = {
        rdd.filter(isMatch) 
    }
    def getMatchedRDD2(rdd: RDD[String]) = {
        val q = query
        rdd.filter(_.contains(q))
    }
}

3.1.9、依赖关系

1. RDD 血缘关系

在这里插入图片描述


依赖关系:两个相邻RDD之间的关系
血缘关系:多个连续的RDD的依赖关系

2. RDD血缘关系的演示

下图演示了RDD的血缘关系:

在这里插入图片描述

  • RDD是不会保存数据的,但是每个RDD会保存自己的血缘关系;
  • 血缘关系的意义:因为RDD不保存数据,一旦计算失败了,不能从上一个RDD重新计算,必须重头计算,那么RDD必须要知道数据源在哪里,血缘关系就用于追溯数据源,提高了容错性

血缘关系演示

val fileRDD: RDD[String] = sc.textFile("SparkCore/target/classes/wc.txt")
println(fileRDD.toDebugString)
println("***********************")

val words: RDD[String] = fileRDD.flatMap(_.split(" "))
println(words.toDebugString)
println("***********************")

val wordToOne: RDD[(String, Int)] = words.map((_, 1))
println(wordToOne.toDebugString)
println("***********************")

val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println(wordToSum.toDebugString)
println("***********************")

val tuples: Array[(String, Int)] = wordToSum.collect()
tuples.foreach(println)

//--------------------------------------结果----------------------------------
(2) SparkCore/target/classes/wc.txt MapPartitionsRDD[1] at textFile at wordcount.scala:17 []
 |  SparkCore/target/classes/wc.txt HadoopRDD[0] at textFile at wordcount.scala:17 []
***********************
(2) MapPartitionsRDD[2] at flatMap at wordcount.scala:21 []
 |  SparkCore/target/classes/wc.txt MapPartitionsRDD[1] at textFile at wordcount.scala:17 []
 |  SparkCore/target/classes/wc.txt HadoopRDD[0] at textFile at wordcount.scala:17 []
***********************
(2) MapPartitionsRDD[3] at map at wordcount.scala:25 []
 |  MapPartitionsRDD[2] at flatMap at wordcount.scala:21 []
 |  SparkCore/target/classes/wc.txt MapPartitionsRDD[1] at textFile at wordcount.scala:17 []
 |  SparkCore/target/classes/wc.txt HadoopRDD[0] at textFile at wordcount.scala:17 []
***********************
(2) ShuffledRDD[4] at reduceByKey at wordcount.scala:29 []
 +-(2) MapPartitionsRDD[3] at map at wordcount.scala:25 []
    |  MapPartitionsRDD[2] at flatMap at wordcount.scala:21 []
    |  SparkCore/target/classes/wc.txt MapPartitionsRDD[1] at textFile at wordcount.scala:17 []
    |  SparkCore/target/classes/wc.txt HadoopRDD[0] at textFile at wordcount.scala:17 []
***********************
(hive,1)
(mapreduce,1)
(flink,1)
(spark,1)
(hadoop,2)

从上面可以看到一个RDD所经过的算子
并且可以看到这个算子是否有shuffle

  • ±:表示依赖断开,也就是经历了shuffle
  • (1) 表示分区

3.RDD依赖关系演示

val fileRDD: RDD[String] = sc.textFile("SparkCore/target/classes/wc.txt")
println(fileRDD.dependencies)
println("***********************")

val words: RDD[String] = fileRDD.flatMap(_.split(" "))
println(words.dependencies)
println("***********************")

val wordToOne: RDD[(String, Int)] = words.map((_, 1))
println(wordToOne.dependencies)
println("***********************")

val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println(wordToSum.dependencies)
println("***********************")

val tuples: Array[(String, Int)] = wordToSum.collect()
tuples.foreach(println)

结果

image-20220824192453599

  1. oneToOneDependency

    在这里插入图片描述

    • oneToOneDependency又叫做窄依赖

      在这里插入图片描述

    • 窄依赖表示每一个父(上游)RDD的Partition最多被子(下游)RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。
  2. shuffleDependency
    • shuffleDependency又叫做宽依赖

      在这里插入图片描述

    • 宽依赖表示同一个父(上游)RDD的Partition被多个子(下游)RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为多生。

3.1.10、RDD持久化 和 checkpoint

RDD持久化
  • RDD复用存在重复计算的问题

    如果一个RDD重复使用,只是RDD对象重用,但是数据不重用,因此会从头再次执行来获取数据;

  • RDD 持久化后复用 不需要从头计算

RDD 持久化方式

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中

cache与persist()

① RDD.cache() : 将数据保存到内存中
② RDD.persist()
cache()方法底层调用的就是persist(StorageLevel.MEMORY_ONLY)方法,持久化在内存中

在这里插入图片描述

在这里插入图片描述

持久化级别

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

在这里插入图片描述

  • Memory_only:只持久化到内存,内存不足,则丢弃数据
  • 后缀有2的表示会做备份。

持久化时机

  • cache()和persist()两个方法不是被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
  • 因为没有action算子,就不会执行,就不会有数据

持久化的应用场景

  1. RDD的数据重用
  2. RDD的血缘依赖比较长,某个RDD数据比较重要的时候,可以对这个RDD做持久化

持久化扩展知识

  1. 持久化数据丢失,只重算受影响分区

    RDD持久化数据是以分区为单位的,如果某个分区的缓存丢失了,比如存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition

  2. shuffle也会持久化

    • Spark的Shuffle操作,中间数据也会做持久化操作(比如:reduceByKey),也就是一个stage和下一个stage交接处会产生持久化。

    • 这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

CheckPoint
  • 检查点其实就是通过将RDD中间结果写入磁盘
  • CheckPoint必须要设置检查点路径,一般都是分布式文件系统中比如HDFS
  1. 演示:

    //todo 1.设置检查点路径
    sc.setCheckpointDir("hdfs://xxx")
    val list = List("Hello Scala","Hello Spark")
    val rdd =sc.makeRDD(list)
    val flatRDD: RDD[String] = rdd.flatMap(_.split(" "))
    val mapRDD: RDD[(String, Int)] = flatRDD.map(word => {
        println("###########")
        (word, 1)
    })
    //todo 2.ck
    mapRDD.checkpoint()
    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
    reduceRDD.collect().foreach(println)
    println("***************")
    val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
    groupRDD.collect().foreach(println)
    
  2. checkpoint源码

    在这里插入图片描述

    • Collect()算子再调用runjob方法后,会调用一个doCheckpoint(),如果设置了checkpoint()就会执行doCheckpoint();

    • doCheckpoint()中会重新提交job,所以会执行两遍job

      在这里插入图片描述

  3. checkPoint()执行时机

    • RDD的checkpoint操作并不会马上被执行,必须执行Action操作才能触发;
      此时会将RDD中的数据写到检查点目录下;

    • 做checkpoint的时候,写入hdfs的数据主要包括:RDD每个分区的实际数据

  4. checkpoint和cache的联合使用

    • 由于在行动算子触发执行的时候,checkpoint会再次独立执行一遍作业,所以会在调用checkpoint()之前先做一个cache()操作,可以从缓存中取数据,避免了重头执行。

      mapRDD.cache()
      mapRDD.checkpoint()
      
  5. checkpoint会切断血缘关系

    checkpoint 将数据写到hdfs之后,会调用rdd的markCheckPoint()方法,斩断该RDD对上游的依赖

    1. cache对血缘关系的影响

      //todo 1.设置检查点路径
      //sc.setCheckpointDir("checkpoint")
      val list = List("Hello Scala","Hello Spark")
      val rdd =sc.makeRDD(list)
      val flatRDD: RDD[String] = rdd.flatMap(_.split(" "))
      val mapRDD: RDD[(String, Int)] = flatRDD.map(word => {
          (word, 1)
      })
      mapRDD.cache()
      //todo 1.在行动算子之前查看血缘关系
      println(mapRDD.toDebugString)
      val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
      reduceRDD.collect().foreach(println)
      println("***************")
      val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
      groupRDD.collect().foreach(println)
      //todo 2.在行动算子之后查看血缘关系
      println(mapRDD.toDebugString)
      

      执行结果:

      行动算子执行前的依赖关系

      image-20220824192136079

      行动算子执行后的依赖关系

      image-20220824192150491

      1.行动算子执行之前,可以看到mapRDD的三个血缘关系
      2.行动算子执行之后,会在血缘关系中添加新的依赖,一旦持久化失效了或者出现问题了,可以重头读取数据。

    2. checkpoint对血缘关系的影响

      //todo 1.设置检查点路径
      sc.setCheckpointDir("checkpoint")
      val list = List("Hello Scala","Hello Spark")
      val rdd =sc.makeRDD(list)
      val flatRDD: RDD[String] = rdd.flatMap(_.split(" "))
      val mapRDD: RDD[(String, Int)] = flatRDD.map(word => {
          (word, 1)
      })
      mapRDD.checkpoint()
      //todo 1.在行动算子之前查看血缘关系
      println(mapRDD.toDebugString)
      val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
      reduceRDD.collect().foreach(println)
      println("***************")
      val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
      groupRDD.collect().foreach(println)
      //todo 2.在行动算子之后查看血缘关系
      println(mapRDD.toDebugString)
      

      行动算子执行前:

      image-20220824191837100

      行动算子执行后:

      image-20220824191858214

      • Checkpoint执行后,会切断血缘关系,重新建立血缘关系。
      • 因为checkpoint将rdd的结果保存到分布式文件系统中,等同于数据源发生了改变。
  6. checkpoint和persist持久化的区别

    1. checkpoint是持久化到磁盘中,persist可以持久化到内存也可以持久化到磁盘

    2. persist持久化到磁盘中不需要设置路径,是因为persist持久化是临时文件,当作业(Job)执行完毕,该文件就会被删除,检查点则不会被删除,因此checkpoint是用于作业恢复的

    3. 血缘关系切断与否

3.1.11、分区器

分区两要素

分区有两个要素:1.分区个数 2.分区规则
在RDD中,我们一般只指定了分区个数,并没有写分区规则,分区规则就由分区器决定。

Spark自带分区器

  1. Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。
  2. Range分区器要求必须能够排序,range能解决数据倾斜

分区器特点

  • 分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数,也就是输出文件的个数。
  • 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
  • 每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

自定义分区器

  1. 分区器怎么写?
    可以参考HashPartitioner怎么写的,此例子中是固定了分区个数,hashpartitioner是作为构造参数传进来的

    在这里插入图片描述

  2. 自定义分区器怎么用
    在可以传入分区器的算子中使用,比如partitioneBy

    在这里插入图片描述

HashPartitioner

分区规则:对于给定的key,计算其hashCode,并除以分区个数取余

class HashPartitioner(partitions: Int) extends Partitioner {
    require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
    def numPartitions: Int = partitions
    def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
    }
    override def equals(other: Any): Boolean = other match {
        case h: HashPartitioner =>
        h.numPartitions == numPartitions
        case _ =>
        false
    }
    override def hashCode: Int = numPartitions
}

Range分区器

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true)
extends Partitioner {

    // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
    require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

    private var ordering = implicitly[Ordering[K]]

    // An array of upper bounds for the first (partitions - 1) partitions
    private var rangeBounds: Array[K] = {
        ...
    }

    def numPartitions: Int = rangeBounds.length + 1

    private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

    def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[K]
        var partition = 0
        if (rangeBounds.length <= 128) {
            // If we have less than 128 partitions naive search
            while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
                partition += 1
            }
        } else {
            // Determine which binary search method to use only once.
            partition = binarySearch(rangeBounds, k)
            // binarySearch either returns the match location or -[insertion point]-1
            if (partition < 0) {
                partition = -partition-1
            }
            if (partition > rangeBounds.length) {
                partition = rangeBounds.length
            }
        }
        if (ascending) {
            partition
        } else {
            rangeBounds.length - partition
        }
    }
    override def equals(other: Any): Boolean = other match {
        ...
    }
    override def hashCode(): Int = {
        ...
    }
    @throws(classOf[IOException])
    private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
        ...
    }
    @throws(classOf[IOException])
    private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
        ...
    }
}

3.1.12、文件保存与读取

Spark的数据读取及数据保存可以从两个维度来作区分:

  • 文件格式
    文件格式分为:text文件、csv文件、sequence文件以及Object文件;
  • 文件系统。
    文件系统分为:本地文件系统、HDFS、HBASE以及数据库。
  1. 从text文件读取数据,和写到text文件中

    // 读取输入文件
    val inputRDD: RDD[String] = sc.textFile("input/1.txt")
    // 保存数据
    inputRDD.saveAsTextFile("output")
    

    一个分区对应一个文件

  2. sequence文件

    • SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。

    • 在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)

    • 只能用于KV类型的RDD

      // 保存数据为SequenceFile
      dataRDD.saveAsSequenceFile("output")
      // 读取SequenceFile文件
      sc.sequenceFile[Int,Int]("output").collect().foreach(println)
      

      泛型是保存的时候数据的类型

  3. object对象文件
    对象文件是将对象序列化后保存的文件,采用Java的序列化机制。
    可以通过objectFile[T: ClassTag](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

    // 保存数据
    dataRDD.saveAsObjectFile("output")
    // 读取数据
    sc.objectFile[Int]("output").collect().foreach(println)
    

    泛型是保存的时候数据的类型

3.2、累加器

  1. 定义

    • 累加器是分布式的共享只写变量
      共享:累加器的值由Driver端共享给Executor端
      只写:Executor端互相之间读取不到对方的累加器

    • 累加器可以替换一些需要shuffle的操作

  2. 问题引入

    val sourceRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
    //TODO 1.用累加替换reduce()
    //直接遍历就完事了
    var sum = 0;
    sourceRDD.foreach(
        num=>{
            sum += num
        }
    )
    println("sum="+sum)
    

    执行结果为0

  3. 原因分析

    Driver端的变量,通过RDD的算子闭包发送到Executor端,Executor分别获取自己分区的数据,还有计算逻辑。

    在这里插入图片描述


    executor1计算: sum = 1+2
    executor2计算: sum = 3+4

    但是Executor计算完后,应该将计算结果返回到Driver端,Spark闭包只知道将数据带到Executor,不知道往Driver端返回。

    在这里插入图片描述

  4. 引入累加器

    累加器是Spark中第二种数据结构,Spark会将累加器从Driver传递到Executor,再将结果返回到Driver。

    在这里插入图片描述


    累加器用来把Executor端变量信息聚合到Driver端,在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge

  5. 累加器的使用

    val rdd = sc.makeRDD(List(1,2,3,4,5))
    //todo 1.获取系统的累加器,Spark默认提供了简单的数据聚合的累加器
    var sum = sc.longAccumulator("sum"); //sum是累加器的名字
    
    rdd.foreach(
        num => {
            // todo 2.使用累加器
            sum.add(num)
        }
    )
    // todo 3.获取累加器的值
    println("sum = " + sum.value)
    

    系统累加器有:longAccumulator,DoubleAccumulator,还有CollectionAccumulator(List)

  6. 累加器使用注意事项

    1. 少加问题

      val rdd = sc.makeRDD(List(1,2,3,4,5))
      // 获取系统的累加器,Spark默认提供了简单的数据聚合的累加器
      var sum = sc.longAccumulator("sum"); //sum是累加器的名字
      rdd.map(
          num => {
              // 使用累加器
              sum.add(num)
              num
          }
      )
      // 获取累加器的值
      println("sum = " + sum.value)
      

      执行结果:0
      少加问题:如果在转换算子中调用累加器,如果没有行动算子的话,那么不会执行
      解决:添加一个行动算子

      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(" ")
      val sc = new SparkContext(conf)
      val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
      var sum = sc.longAccumulator("sum"); //sum是累加器的名字
      val value: RDD[Int] = rdd.map(
          num => {
              // 使用累加器
              sum.add(num)
              num
          }
      )
      value.collect()
      // 获取累加器的值
      println("sum = " + sum.value)
      

      执行结果为:sum = 15

    2. 多加问题

      val rdd = sc.makeRDD(List(1,2,3,4,5))
      // 获取系统的累加器,Spark默认提供了简单的数据聚合的累加器
      var sum = sc.longAccumulator("sum"); //sum是累加器的名字
      val mapRDD = rdd.map(
          num => {
              // 使用累加器
              sum.add(num)
              num
          }
      )
      mapRDD.collect()
      mapRDD.collect()
      // 获取累加器的值
      println("sum = " + sum.value)
      

      执行结果:30
      多加问题:累加器在SparkContext环境中是全局共享的,行动算子调用一次会执行一次,如果有多个行动算子,就会累加多次。

    3. 累加器一般使用方式

      解决方法:一般情况下,将累加器放在行动算子中进行操作。

  7. 自定义累加器

    object _04_diy {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(" ")
            val sc = new SparkContext(conf)
            val rdd = sc.makeRDD(List("hello","Spark","hello"))
            //todo 1.创建累加器
            val wcAcc = new myAcc()
            //todo 2.注册
            sc.register(wcAcc,"wordCountAcc")
            //todo 3.使用
            rdd.foreach(
                word=>{
                    wcAcc.add(word)
                }
            )
            //todo 4.获取
            println(wcAcc.value)
        }
    }
    //6个方法
    //泛型:IN 累加器输入的数据  out 累加器输出的数据类型
    class myAcc extends AccumulatorV2[String,mutable.Map[String,Long]] {
        //创建一个Map做返回值
        private var wcMap = mutable.Map[String,Long]()
        //累加
        override def add(word: String): Unit = {
            val cnt: Long = wcMap.getOrElse(word, 0l)
            wcMap.update(word,cnt+1)
        }
        override def value: mutable.Map[String, Long] = {
            wcMap
        }
        //Driver端合并多个累加器
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
            val map1 = this.wcMap
            val map2 = other.value
    
            map2.foreach{
                case (word,cnt) =>{
                    val newCnt = map1.getOrElse(word,0l) + cnt
                    map1.update(word,newCnt)
                }
            }
        }
        override def isZero: Boolean = {
            wcMap.isEmpty
        }
        override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
            new  myAcc()
        }
        override def reset(): Unit = wcMap.clear()
    }
    

    Map(Spark -> 1, hello -> 2)

3.3、广播变量

概念

分布式共享只读变量

  1. 变量
  2. 分布式中执行的
  3. 共享:一个Executor中的多个Task共享

实现原理

在这里插入图片描述

  1. 如上图所示,当一个算子用到了Driver端的数据(图中的map),那么就会对此数据进行闭包,发送到Executor端进行运算
  2. 每个Task中都需要用到map(闭包数据),如果有十个分区意味着十个Task。场景如下:如果只有一个Executor,那么十个Task都在一台Executor上,而且这个Executor只有一个core,此时就是并发执行了。一个Executor放了十份数据,造成大量冗余占用内存。

    在这里插入图片描述

  3. 广播变量所解决的问题: Executor就是一个JVM,启动的时候会自动分配内存,完全可以将闭包数据放置在Executor内存中,达到Task共享的目的,如下所示,将Task的变量提取到Executor共享

    在这里插入图片描述


    注意:这个共享变量是不能被Task所修改的,只能被Task读取
    Spark中的广播变量将Task所需要的闭包数据保存到Executor内存中,由当前Executor中的多个Task共享,并不是每个Task享有一个。

广播变量用来高效分发较大的对象。向所有Executor只发送一个较大的只读值,以供一个或多个Task使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。

广播变量的使用

val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
//1. 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)

val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
    case (key, num) => {
        var num2 = 0
        // 2.使用广播变量的值
        for ((k, v) <- broadcast.value) {
            if (k == key) {
                num2 = v
            }
        }
        (key, (num, num2))
    }
}

4、spark SQL

4.1、核心编程

4.1.1、SparkSession

SparkSession介绍

  1. SparkCore RDD编程的起点 SparkContext
    Spark Core中,如果想要执行应用程序,需要首先构建上下文环境对象SparkContext,Spark SQL其实可以理解为对Spark Core的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
  1. SparkSQL编程起点

    老的版本中,SparkSQL提供两种SQL查询起始点:

    • 一个叫SQLContext,用于Spark自己提供的SQL查询;

    • 一个叫HiveContext,用于连接Hive的查询。

    新版本中,SparkSession是Spark最新的SQL查询起始点:
    实质上SparkSessionSQLContextHiveContext的组合,所以在SQLContexHiveContext上可用的API在SparkSession上同样是可以使用的。
    SparkSession在Spark 2.0 為 Hive 功能提供了內置支持,包括使用 HiveQL 編寫查詢的能力、訪問 Hive UDF 以及從 Hive 表讀取數據的能力。要使用這些功能,您不需要擁有現有的 Hive 設置。

  1. SparkSession和SparkContext的关系
    SparkSession内部封装了SparkContext,所以计算实际上是由sparkContext完成的。当我们使用 spark-shell 的时候, spark框架会自动的创建一个名称叫做spark的SparkSession对象, 就像我们以前可以自动获取到一个sc来表示SparkContext对象一样

SparkSession使用

import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

4.1.2、DataFrame

  • 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
  • DataFrame与RDD的主要区别在于,前者带有schema元信息,也就是字段名

    在这里插入图片描述


    上图直观地体现了DataFrame和RDD的区别。
    左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。
  • DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待
  • 同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从 API 易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API 要更加友好,门槛更低。
  • Spark SQL的DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。
  • DataFrame API 既有 transformation操作也有action操作
DataFrame开发演示

创建DataFrame

在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:

  • 通过Spark的数据源进行创建
  • 从一个存在的RDD进行转换
  • 从Hive Table进行查询返回
  1. 读取文件数据来创建dataframe

    1. 查看Spark支持创建文件的数据源格式

      scala> spark.read.
      csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile
      

      json文件、csv文件中的数据都带有字段信息,所以可以直接映射成DataFrame

    2. 在spark的bin/data目录中创建user.json文件

      {"username":"zhangsan","age":20}
      
    3. 读取json文件创建DataFrame

      scala> val df = spark.read.json("data/user.json")
      df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
      

      注意:

      • 如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;

      • 但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换

    4. 展示结果

      +---+--------+
      z
      +---+--------+
      | 20|zhangsan|
      +---+--------+
      
  2. 从RDD进行转换

  3. 从Hive Table进行查询返回`

DataFrame的SQL语法开发

SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助

  1. 读取JSON文件创建DataFrame

    scala> val df = spark.read.json("data/user.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
    
  2. 对DataFrame创建一个临时视图

    scala> df.createOrReplaceTempView("people")
    
  3. 通过SQL语句实现查询全表

    scala> val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
  4. 结果展示

    scala> sqlDF.show
    +---+--------+
    |age|username|
    +---+--------+
    | 20|zhangsan|
    | 30|     lisi|
    | 40|   wangwu|
    +---+--------+
    

临时表和全局表

  • 普通临时表是一个SparkSession范围内的
  • 如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people
  1. 对于DataFrame创建一个全局表

    scala> df.createGlobalTempView("people")
    
  2. 通过SQL语句实现查询全表

    //在session1中查询
    scala> spark.sql("SELECT * FROM global_temp.people").show()
    +---+--------+
    |age|username|
    +---+--------+
    | 20|zhangsan|
    | 30|     lisi|
    | 40|   wangwu|
    +---+--------+
    
    //在session2中查询
    scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
    +---+--------+
    |age|username|
    +---+--------+
    | 20|zhangsan|
    | 30|     lisi|
    | 40|   wangwu|
    +---+--------+
    

DSL语法开发

  • DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL
  • 使用 DSL 语法风格不必去创建临时视图了
  • DSL中函数其实就相当于RDD的算子
  1. 创建一个DataFrame

    scala> val df = spark.read.json("data/user.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
  2. 查看DataFrame的Schema信息

    scala> df.printSchema
    root
     |-- age: Long (nullable = true)
     |-- username: string (nullable = true)
    
  3. 只查看"username"列数据

    scala> df.select("username").show()
    +--------+
    |username|
    +--------+
    |zhangsan|
    |     lisi|
    |   wangwu|
    +--------+
    
  4. 查看"username"列数据以及"age+1"数据
    注意:涉及到运算的时候, 每列都必须使用\$, 或者采用引号表达式:单引号+字段名

    scala> df.select($"username",$"age" + 1).show
    scala> df.select('username, 'age + 1).show()
    scala> df.select('username, 'age + 1 as "newage").show()
    
    +--------+---------+
    |username|(age + 1)|
    +--------+---------+
    |zhangsan|       21|
    |    lisi|       31|
    |  wangwu|       41|
    +--------+---------+
    
  5. 查看"age"大于"30"的数据

    scala> df.filter($"age">30).show
    +---+---------+
    |age| username|
    +---+---------+
    | 40|    wangwu|
    +---+---------+
    
  6. 按照"age"分组,查看数据条数

    scala> df.groupBy("age").count.show
    +---+-----+
    |age|count|
    +---+-----+
    | 20|    1|
    | 30|    1|
    | 40|    1|
    +---+-----+
    

RDD和DataFrame转换

RDD没有结构信息,DataFrame有结构信息

  1. RDD转换为DataFrame

    1. 方式1:RDD直接转
      RDD.toDF(列名列表)

      scala> val idRDD = sc.textFile("data/id.txt")
      scala> idRDD.toDF("id").show
      +---+
      | id|
      +---+
      |  1|
      |  2|
      |  3|
      |  4|
      +---+
      

      注意:

      • 在IDEA中开发程序时,如果需要RDD与DF或者DS之间互相操作,那么需要引入 import spark.implicits._

      • 这里的spark不是Scala中的包名,而是创建的sparkSession对象的变量名称,所以必须先创建SparkSession对象再导入。

      • 这里的spark对象不能使用var声明,因为Scala只支持val修饰的对象的引入。

      • spark-shell中无需导入,自动完成此操作。

    2. 方式2:样例类转
      实际开发中,一般通过样例类将RDD转换为DataFrame

      scala> case class User(name:String, age:Int)
      defined class User
      scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show
      +--------+---+
      |     name|age|
      +--------+---+
      |zhangsan| 30|
      |    lisi| 40|
      +--------+---+
      
  2. DataFrame转换为RDD

    DataFrame其实就是对RDD的封装,所以可以直接获取内部的RDD

    scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> val rdd = df.rdd
    rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] at rdd at <console>:25
    
    scala> val array = rdd.collect
    array: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])
    

    注意:此时得到的RDD存储类型为Row

    scala> array(0)
    res28: org.apache.spark.sql.Row = [zhangsan,30]
    scala> array(0)(0)
    res29: Any = zhangsan
    scala> array(0).getAs[String]("name")
    res30: String = zhangsan
    

4.1.3、DataSet

  • DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新抽象
  • 是DataFrame的一个扩展。它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter等等)。
  • DataSet是DataFrame API的一个扩展,是SparkSQL最新的数据抽象
  • 用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;
  • 用样例类来对DataSet中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称;
  • DataSet是强类型的。比如可以有DataSet[Car],DataSet[Person]。
    DataFrame是DataSet的特列,DataFrame=DataSet[Row] ,所以可以通过as方法将DataFrame转换为DataSet。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。获取数据时需要指定顺序
创建DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

通过序列创建DataSet

  1. 使用样例类序列创建DataSet

    scala> case class Person(name: String, age: Long)
    defined class Person
    
    scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
    
    caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
    
    scala> caseClassDS.show
    +---------+---+
    |     name|age|
    +---------+---+
    | zhangsan|  2|
    +---------+---+
    
  2. 使用基本类型的序列创建DataSet

    scala> val ds = Seq(1,2,3,4,5).toDS
    ds: org.apache.spark.sql.Dataset[Int] = [value: int]
    
    scala> ds.show
    +-----+
    |value|
    +-----+
    |    1|
    |    2|
    |    3|
    |    4|
    |    5|
    +-----+
    

    注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

通过RDD转换为DataSet

SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seq或者Array等复杂的结构。

scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
DataSet转换为RDD

DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD

scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val rdd = res11.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25

scala> rdd.collect
res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))

4.1.4、DataFrame和DataSet转换

DataFrame其实是DataSet的特例,所以它们之间是可以互相转换的。

DataFrame转换为DataSet

scala> case class User(name:String, age:Int)
defined class User

scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

DataSet转换为DataFrame

scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

4.1.5、三者的互相转换

在这里插入图片描述

4.1.6、RDD、DataFrame、DataSet三者的关系

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:

  • Spark1.0 => RDD
  • Spark1.3 => DataFrame
  • Spark1.6 => Dataset

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。
不同是的他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口。

三者的共性

  • RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
  • 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
  • 三者有许多共同的函数,如filter,排序等;
  • 在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)
  • 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
  • 三者都有partition的概念
  • DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型

三者的区别

  1. RDD
  • RDD一般和spark mllib同时使用
  • RDD不支持sparksql操作
  1. DataFrame
  • 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
  • DataFrame与DataSet一般不与 spark mllib 同时使用
  • DataFrame与DataSet均支持 SparkSQL 的操作,比如select,groupby之类,还能注册临时表/视窗,进行 sql 语句操作
  • DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)
  1. DataSet
  • Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。 DataFrame其实就是DataSet的一个特例 type DataFrame = Dataset[Row]
  • DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

4.1.7、IDEA开发SparkSQL

添加依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

代码演示

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object SparkSQL01_Demo {
    def main(args: Array[String]): Unit = {
        //todo 1.创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

        //todo 2.创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

        //todo 3.隐式转换,为了用dataFrame dataSet
        //RDD=>DataFrame=>DataSet转换需要引入隐式转换规则,否则无法转换
        //spark不是包名,是上下文环境对象名
        import spark.implicits._

        //读取json文件 创建DataFrame  {"username": "lisi","age": 18}
        val df: DataFrame = spark.read.json("input/test.json")
        //df.show()

        //todo 4. DataFrame 的 SQL风格语法
        df.createOrReplaceTempView("user")
        //spark.sql("select avg(age) from user").show

        //todo 5.DSL风格语法
        //df.select("username","age").show()

        //todo 6. *****RDD=>DataFrame=>DataSet*****
        //RDD
        val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu",20)))

        //DataFrame
        val df1: DataFrame = rdd1.toDF("id","name","age")
        //df1.show()

        //DateSet
        val ds1: Dataset[User] = df1.as[User]
        //ds1.show()

        //*****DataSet=>DataFrame=>RDD*****
        //DataFrame
        val df2: DataFrame = ds1.toDF()

        //RDD  返回的RDD类型为Row,里面提供的getXXX方法可以获取字段值,类似jdbc处理结果集,但是索引从0开始
        val rdd2: RDD[Row] = df2.rdd
        //rdd2.foreach(a=>println(a.getString(1)))

        //*****RDD=>DataSet*****
        rdd1.map{
            case (id,name,age)=>User(id,name,age)
        }.toDS()

        //*****DataSet=>=>RDD*****
        ds1.rdd

        //释放资源
        spark.stop()
    }
}
case class User(id:Int,name:String,age:Int)

4.2、自定义函数

用户可以通过spark.udf功能添加自定义函数,实现自定义功能

4.2.1、UDF

步骤:

  1. 创建DataFrame

    scala> val df = spark.read.json("data/user.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
    
  2. 注册UDF

    scala> spark.udf.register("addName",(x:String)=> "Name:"+x)
    res9: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
    
  3. 创建临时表

    scala> df.createOrReplaceTempView("people")
    
  4. 应用UDF

    scala> spark.sql("Select addName(name),age from people").show()
    

4.2.2、UDAF

UDAF原理

需求:计算平均工资

在这里插入图片描述

一个需求可以采用很多种不同的方法实现需求

  1. 实现方式 - RDD

    val conf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val res: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangw", 40))).map {
      case (name, age) => {
        (age, 1)
      }
    }.reduce {
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    }
    println(res._1/res._2)
    // 关闭连接
    sc.stop()
    
  2. 实现方式 - 累加器

    class MyAC extends AccumulatorV2[Int,Int]{
        var sum:Int = 0
        var count:Int = 0
        override def isZero: Boolean = {
            return sum ==0 && count == 0
        }
    
        override def copy(): AccumulatorV2[Int, Int] = {
            val newMyAc = new MyAC
            newMyAc.sum = this.sum
            newMyAc.count = this.count
            newMyAc
        }
    
        override def reset(): Unit = {
            sum =0
            count = 0
        }
    
        override def add(v: Int): Unit = {
            sum += v
            count += 1
        }
    
        override def merge(other: AccumulatorV2[Int, Int]): Unit = {
            other match {
                case o:MyAC=>{
                    sum += o.sum
                    count += o.count
                }
                case _=>
            }
    
        }
    
        override def value: Int = sum/count
    }
    
  3. 实现方式 - UDAF - 弱类型

    强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。

    • 通过继承UserDefinedAggregateFunction来实现用户自定义弱类型聚合函数。
    • 从Spark3.0版本后UserDefinedAggregateFunction已经不推荐使用了。可以统一采用强类型聚合函数Aggregator
    • 弱类型的特点就是只能通过ROW的索引获取对应的字段,强类型可以直接通过类的属性获取
    /*
    定义类继承UserDefinedAggregateFunction,并重写其中方法
    */
    class MyAveragUDAF extends UserDefinedAggregateFunction {
    
      // 聚合函数输入参数的数据类型
      def inputSchema: StructType = StructType(Array(StructField("age",IntegerType)))
    
      // 聚合函数缓冲区中值的数据类型(age,count)
      def bufferSchema: StructType = {
        StructType(Array(StructField("sum",LongType),StructField("count",LongType)))
      }
    
      // 函数返回值的数据类型
      def dataType: DataType = DoubleType
    
      // 稳定性:对于相同的输入是否一直返回相同的输出。
      def deterministic: Boolean = true
    
      // 函数缓冲区初始化
      def initialize(buffer: MutableAggregationBuffer): Unit = {
        // 存年龄的总和
        buffer(0) = 0L
        // 存年龄的个数
        buffer(1) = 0L
      }
    
      // 更新缓冲区中的数据
      def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
        if (!input.isNullAt(0)) {
          buffer(0) = buffer.getLong(0) + input.getInt(0)
          buffer(1) = buffer.getLong(1) + 1
        }
      }
    
      // 合并缓冲区
      def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = {
        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
      }
    
      // 计算最终结果
      def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
    }
    
    。。。
    
    //创建聚合函数
    var myAverage = new MyAveragUDAF
    
    //在spark中注册聚合函数
    spark.udf.register("avgAge",myAverage)
    
    spark.sql("select avgAge(age) from user").show()
    
  4. 实现方式 - UDAF - 强类型

    package SparkSQL
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}
    import org.apache.spark.sql.expressions.Aggregator
    
    object _03_UDAF {
    
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    
        val df: DataFrame = spark.read.json("input/user.json")
        df.createOrReplaceTempView("user")
        //todo 注册自定义函数
        //sql不关注类型,所以将强类型操作转换为弱类型
        spark.udf.register("ageAvg",functions.udaf(new MyAvgUDAF()))
    
        spark.sql("select ageAvg(age) from user").show
        //+--------------+
        //|myavgudaf(age)|
        //+--------------+
        //|            20|
        //+--------------+
    
        spark.close()
      }
    }
    
    //todo 1.自定义聚合函数:计算年龄平均值
    
    //1.继承org.apache.spark.sql.expressions.Aggregator
    //2.泛型  IN:输入数据类型   BUF:buffer中的数据类型  OUT:输出的数据类型
    case class Buff(var total:Long,var count:Long)
    //用样例类作为缓冲区的数据类型,total是总的薪资,count是个数
    //用var修饰属性,是因为银行里类默认是val不能修改
    
    class MyAvgUDAF extends Aggregator[Long,Buff,Long] {
      //todo 3.缓冲区初始化
      override def zero: Buff = Buff(0L,0L)
      //todo 4.根据输入的数据更新缓冲区中的数据
      override def reduce(buff: Buff, in: Long): Buff = {
        buff.total = buff.total + in
        buff.count = buff.count + 1
        buff
      }
      //todo 5.合并缓冲区
      override def merge(buff1: Buff, buff2: Buff): Buff = {
        buff1.total = buff1.total + buff2.total
        buff1.count = buff1.count + buff2.count
        buff1
      }
      //todo 6.计算结果
      override def finish(buff: Buff): Long = {
        buff.total/buff.count
      }
      //todo 7.分布式计算 需要将数据进行网络中传输,所以涉及缓冲区序列化和编码问题
      //缓冲区的编码操作  自定义类就用这个Encoders.product
      override def bufferEncoder: Encoder[Buff] = Encoders.product
      //输出的编码操作
      override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    }
    

4.3、数据加载和保存

4.3.1、通用的加载和保存方式

SparkSQL提供了通用的保存数据和数据加载的方式。

  • 这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据
查看SparkSql能读取的文件格式
scala> spark.read.

csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile
  • SparkSQL默认读取和保存的文件格式为parquet
  • spark.read.load 是加载数据的通用方法
使用load加载数据
  • 由于SparkSQL默认读取和保存的文件格式为parquet,如果读取不同格式的数据,可以对不同的数据格式进行设定

  • 通用加载数据的语法格式:

    scala> spark.read.format("…")[.option("…")].load("…")
    
  • format("…"):指定加载的数据类型,包括"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile”。

  • load("…"):在"csv"、“jdbc”、“json”、“orc”、"parquet"和"textFile"格式下需要传入加载数据的路径。

  • option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable

  • 我们前面都是使用read API 先把文件加载到 DataFrame然后再查询,其实,我们也可以直接在文件上进行查询: 文件格式.文件路径

    scala>spark.sql("select * from json.`/opt/module/data/user.json`").show
    
使用save保存数据

df.write.save 是保存数据的通用方法

查看SparkSql能保存的文件格式

scala>df.write.
csv  jdbc   json  orc   parquet textFile… …

保存通用格式

如果保存不同格式的数据,可以对不同的数据格式进行设定

scala>df.write.format("…")[.option("…")].save("…")
  • format("…"):指定保存的数据类型,包括"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile”。
  • save ("…"):在"csv"、“orc”、"parquet"和"textFile"格式下需要传入保存数据的路径。
  • option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
  • 保存操作可以使用 SaveMode, 用来指明如何处理数据,使用mode()方法来设置。
    有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。
    SaveMode是一个枚举类,其中的常量包括:

    在这里插入图片描述

df.write.mode("append").json("/opt/module/data/output")

Parquet

  • Spark SQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。
  • 数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可修改默认数据源格式。
  1. 加载数据
scala> val df = spark.read.load("examples/src/main/resources/users.parquet")
scala> df.show
  1. 保存数据
scala> var df = spark.read.json("/opt/module/data/input/people.json")
//保存为parquet格式
scala> df.write.mode("append").save("/opt/module/data/output")

读写MySQL

  • Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
  • 如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下。
bin/spark-shell 
--jars mysql-connector-java-5.1.27-bin.jar

在Idea中通过JDBC对Mysql进行操作

  1. 导入依赖

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.27</version>
    </dependency>
    
  2. 读取数据

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    
    //创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    
    import spark.implicits._
    
    //方式1:通用的load方法读取
    spark.read.format("jdbc")
      .option("url", "jdbc:mysql://linux1:3306/spark-sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123123")
      .option("dbtable", "user")
      .load().show
    
    
    //方式2:通用的load方法读取 参数另一种形式
    spark.read.format("jdbc")
      .options(Map("url"->"jdbc:mysql://linux1:3306/spark-sql?user=root&password=123123",
        "dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show
    
    //方式3:使用jdbc方法读取
    val props: Properties = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123123")
    val df: DataFrame = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)
    df.show
    
    //释放资源
    spark.stop()
    
  3. 写入数据

    case class User2(name: String, age: Long)
    。。。
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    
    //创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    
    val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), User2("zs", 30)))
    val ds: Dataset[User2] = rdd.toDS
    //方式1:通用的方式  format指定写出类型
    ds.write
      .format("jdbc")
      .option("url", "jdbc:mysql://linux1:3306/spark-sql")
      .option("user", "root")
      .option("password", "123123")
      .option("dbtable", "user")
      .mode(SaveMode.Append)
      .save()
    
    //方式2:通过jdbc方法
    val props: Properties = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123123")
    ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)
    
    //释放资源
    spark.stop()
    

Hive

  • Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL编译时可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。
  • 若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。
    spark-shell默认是Hive支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。
  1. 内嵌的HIVE

    • 如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.
    • Hive 的元数据存储在 derby 数据库中, 默认仓库地址:$SPARK_HOME/spark-warehouse (可以用于识别是否使用的是内置的hive,如果是,则会生成这个目录)
    scala> spark.sql("show tables").show
    。。。
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    +--------+---------+-----------+
    
    scala> spark.sql("create table aa(id int)")
    
    。。。
    
    scala> spark.sql("show tables").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    | default|       aa|      false|
    +--------+---------+-----------+
    
    向表加载本地数据
    scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")
    
    。。。
    
    scala> spark.sql("select * from aa").show
    +---+
    | id|
    +---+
    |  1|
    |  2|
    |  3|
    |  4|
    +---+
    在实际使用中, 几乎没有任何人会使用内置的 Hive
    
  2. 外部的HIVE

    如果想连接外部已经部署好的Hive,需要通过以下几个步骤:

    1. Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下
    2. 把Mysql的驱动copy到jars/目录下
      因为hive元数据保存在Mysql中
    3. 如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下
    4. 重启spark-shell
    • spark-shell操作外部的hive

      scala> spark.sql("show tables").show
      20/04/25 22:05:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
      +--------+--------------------+-----------+
      |database|           tableName|isTemporary|
      +--------+--------------------+-----------+
      | default|                 emp|      false|
      | default|hive_hbase_emp_table|      false|
      | default| relevance_hbase_emp|      false|
      | default|          staff_hive|      false|
      | default|                 ttt|      false|
      | default|   user_visit_action|      false|
      +--------+--------------------+-----------+
      
    • Spark SQL CLI

      • Spark-shell中写sql,还得写spark.sql(xxx).show 比较麻烦,
      • Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口
      bin/spark-sql
      
    • Spark beeline

      Spark Thrift Server是Spark社区基于HiveServer2实现的一个Thrift服务。旨在无缝兼容HiveServer2。因为Spark Thrift Server的接口和协议都和HiveServer2完全一致,因此我们部署好Spark Thrift Server后,可以直接使用hive的beeline访问Spark Thrift Server执行相关语句。Spark Thrift Server的目的也只是取代HiveServer2,因此它依旧可以和Hive Metastore进行交互,获取到hive的元数据。

      如果想连接Thrift Server,需要通过以下几个步骤:
       Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下
       把Mysql的驱动copy到jars/目录下
       如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下
       启动Thrift Server

      sbin/start-thriftserver.sh
      

       使用beeline连接Thrift Server

      bin/beeline -u jdbc:hive2://linux1:10000 -n root
      

      在这里插入图片描述

    • 代码操作Hive

      1. 导入依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>
        
      2. 将hive-site.xml文件拷贝到项目的resources目录中,(target的classes目录下有才能起作用,否则跑的还是内置的hive)

      3. 启用Hive支持,默认不支持Hive

        //创建SparkSession
        val spark: SparkSession = SparkSession
          .builder()
          .enableHiveSupport()
          .master("local[*]")
          .appName("sql")
          .getOrCreate()
        

        注意:在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址:

        config("spark.sql.warehouse.dir", "hdfs://linux1:8020/user/hive/warehouse")
        

        如果在执行操作时,出现如下错误:
        该错误是权限问题,用户做HDFS没有相应的权限

        在这里插入图片描述

        可以代码最前面增加如下代码解决:

        System.setProperty("HADOOP_USER_NAME", "root")
        

        此处的root改为你们自己的hadoop用户名称

4.4、内置函数

官方链接

5、SparkStreaming

5.1、概述

流式数据处理:来一条处理一条
批量数据处理:一起处理多条数据

实时处理数据:数据处理的延迟是毫秒级别
离线数据处理:数据处理的延迟是小时甚至是天

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等

在这里插入图片描述


和Spark 基于 RDD 的概念很相似,Spark Streaming使用离散化流(discretized stream 作为抽象表示,叫作DStream。DStream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为RDD 存在,而 DStream是由这些RDD 所组成的序列(因此得名“离散化”)。所以简单来说,DStream就是对RDD在实时数据处理场景的一种封装,DStream本质上就是一系列时间上连续的RDD,即DStream=>Seq[RDD]

Spark Streaming是一种准实时(数据处理延迟在秒或者分钟)、微批次(几秒处理一次数据)的数据处理框架

在这里插入图片描述

背压机制

Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数"spark.streaming.receiver.maxRate"的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于 maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题

为了更好的协调数据接收速率与资源处理能力,1.5 版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率

通过属性"spark.streaming.backpressure.enabled"来控制是否启用backpressure机制,默认值false,即不启用

5.2、使用

5.2.1、基础环境 (wordcount测试)

添加依赖

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming_2.12</artifactId>
	<version>3.0.0</version>
</dependency>
12345

由于我们是流式处理,采集器不可以关闭!,我们监听了本地的9999端口,并且通过netcat发送数据,模拟流式数据

object WcDemo {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化 SparkStreamingContext  采集周期为3秒
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 业务处理.....
    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val words: DStream[(String, Int)] = line.flatMap(_.split("")).map((_, 1))

    val value: DStream[(String, Int)] = words.reduceByKey(_ + _)

    value.print()



    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()
  }
}

在这里插入图片描述


在这里插入图片描述


对数据的操作也是按照RDD 为单位来进行的

在这里插入图片描述

5.2.2、DStream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD 中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果 StreamingContext中没有设定输出操作,整个context就都不会启动

  • print():在运行流程序的驱动结点上打印DStream 中每一批次数据的最开始10 个元素。这用于开发和调试
  • saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个DStream的内容
  • saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为SequenceFiles
  • saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files
  • foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个RDD中数据推送到外部系统,如将RDD 存入文件或者通过网络将其写入数据库
    通用的输出操作foreachRDD(),它用来对DStream 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意RDD。在 foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中

5.2.3、自定义采集器 — 对接Kafka

使用KafkaUtils.createDirectStream API 指定主题后和属性后,为了后续方便,我们会将接收到的数据封装到样例类

LocationStrategies.PreferConsistent,这个策略会将分区(kafka的分区)分布到所有可获得的executor上。如果你的executor和kafkabroker在同一主机上的话,可以使用PreferBrokers,这样kafka leader会为此分区进行调度。最后,如果你加载数据有倾斜的话可以使用PreferFixed,这将允许你制定一个分区和主机的映射(没有指定的分区将使用PreferConsistent 策略)

ConsumerStrategies.Subscribe,你可以使用它去订阅一个topics集合

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Stream")
      val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))

      // kafka消费者配置
      val kafkaParam = Map(
        "bootstrap.servers" -> "hadoop102:9092" ,
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "1000",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (true: java.lang.Boolean)
      )
      // 泛型是接收的kafka数据K-V
      val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Set("topicx"), kafkaParam))
      // 封装点击数据到样例类
      val clickData: DStream[AdClickData] = kafkaDataDS.map(data => {
        val datas: Array[String] = data.value().split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
      })

      

      ssc.start()
      ssc.awaitTermination()
    }

    case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
  }
}

5.2.4、DStream - 有状态转化

DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window 相关的原语

无状态转化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如reduceByKey())要添加import StreamingContext._才能在 Scala中使用

在这里插入图片描述


需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的

例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据

虽然都是hello,但是是无状态的,也是说,这两个统计是没有关系的,也即我们不会保存任何一个采集周期的数据,下图,第一个三秒告诉我们hello为1,第二个3秒告诉我们hello为2,各个周期数据独立

在这里插入图片描述


实现有状态(保存前面采集周期的数据)

需要DS的原语,注意原语只是个名称,类似RDD的算子,实际上就是方法而已

updateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream

updateStateByKey() 的结果会是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的,比如对wordcount而言,键就是每个word对应的统计值,状态就是某个缓冲区

updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

1.定义状态,状态可以是一个任意的数据类型
2.定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新

在这里插入图片描述

使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态

以下程序中,由于ByKey操作已经帮我们区分好了key,所以键就是一个seq集合来记录每个单词的count,缓冲区就是一个记录总和的变量

object WcDemo {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化 SparkStreamingContext  采集周期为3秒
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")


    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val words: DStream[(String, Int)] = line.flatMap(_.split("")).map((_, 1))

    val value: DStream[(String, Int)] = words.updateStateByKey((seq: Seq[Int], opt: Option[Int]) => {
      val newcount = opt.getOrElse(0) + seq.sum
      Option(newcount)
    })

    value.print()

    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()
  }
}

5.2.5、DStream - 无状态操作 Transform

先看代码,我们监视9999端口一行输入一个word,进行wordcount测试,有两种方法

val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

// 方法一 耳详能熟

// ** Driver代码
line.map(str =>{
	// 这里的代码只可以运行在Executor
  (str,1)
}).reduceByKey(_+_)

// 方法二 使用Transform原语 

// ** Driver代码 
val res: DStream[(String, Int)] = line.transform(rdd => {
  println("我是Driver端的代码")  // 这里可以运行Driver端代码
  rdd.map(str => {
  // 这里的代码只可以运行在Executor
    (str, 1)
  })
}).reduceByKey(_ + _)

也就是说,transform原语可以让Driver端的代码周期性执行!!

主要作用:补充一些DStream的功能

在这里插入图片描述

5.2.6、DStream - 无状态操作 join

一句话:两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相通,类似MYSQL的内连接

object JoinTest {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化 SparkStreamingContext  采集周期为3秒
    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))

    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val line1: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)

    val map9999: DStream[(String, Int)] = line.map((_, 1))

    val map8888: DStream[(String, Int)] = line1.map((_, 1))

    val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)

    joinDS.print()

    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()
  }
}

在这里插入图片描述

5.2.7、滑动窗口常用函数

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态,所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,可以只传一个参数

  • 窗口时长:计算内容的时间范围
  • 滑动步长:隔多久触发一次计算

窗口时长必须大于等于滑动步长

在这里插入图片描述

window

示例:window(Seconds(10),Seconds(5)) 5秒处理一次过去10秒的数据

注意:这两者都必须为采集周期大小的整数倍

object Window {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化 SparkStreamingContext  采集周期为5秒
    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val windowData: DStream[String] = line.window(Seconds(10),Seconds(5))

    val wordToOne: DStream[(String, Int)] = windowData.map((_, 1))
    val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)

    wordToCount.print()

    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()
  }

}

当窗口时长大于滑动步长时,有数据会重复计算

当窗口时长等于滑动步长时,不会出现重复计算的问题!

countByWindow

countByWindow(windowLength, slideInterval)

返回一个滑动窗口计数流中的元素个数

示例:countByWindow(Seconds(10), Seconds(5)) 每5秒计算过去10秒内的元素个数

reduceByWindow

reduceByWindow(func, windowLength, slideInterval)
通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流

val count: DStream[String] = line.reduceByWindow(_ + _, Seconds(5), Seconds(5))
1

每5秒对5秒内的元素进行拼接

在这里插入图片描述

reduceByKeyAndWindow - 1

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值,这里的函数必须写完整

reduceByKeyAndWindow - 2

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于可逆的 reduce 函数,也就是这些 reduce函数有相应的反reduce函数(以参数 invFunc 形式传入)。如前述函数,reduce 任务的数量通过可选参数来配置

在这里插入图片描述

重点 根据图片看这两种原语的区别

假设我们就输入这些数据

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

第一次计算
10秒内数据是
(a,1) (b,1) (c,1) (d,1)

第二次计算
(c,1) (e,1) (d,1) (a,1)

第三次计算
啥都没有

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

第一次计算
10秒内数据是
(a,1) (b,1) (c,1) (d,1)

第二次计算
(c,1) (b,0) (e,1) (d,1) (a,1)

第三次计算
(c,0) (b,0) (e,0) (d,0) (a,0)

5.2.8优雅关闭DStream

流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了
使用外部文件系统来控制内部程序关闭

这里第二个参数的意思是,处理完当前已经接收的数据后再关闭

ssc.stop(true,true)
1

在这里插入图片描述


但是我们应该注意到

 // 启动采集器
ssc.start()
// 等待采集器的关闭
ssc.awaitTermination()  // 阻塞main线程

ssc.stop(true,true)

ssc.awaitTermination() 阻塞当前线程,也就是说stop根本执行不到

最好的办法是创建一个线程,根据外部文件的标志位来结束

class StopM (ssc: StreamingContext) extends Runnable{
  override def run(): Unit = {

    val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "gzhu")

    while (true) { 
       try {
         Thread.sleep(5000)
       }catch {
         case e: InterruptedException => e.printStackTrace()
       }
      
      val state: StreamingContextState = ssc.getState
      val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark")) 
      if (bool) {
        if (state == StreamingContextState.ACTIVE) { ssc.stop(stopSparkContext = true, stopGracefully = true)
          System.exit(0)
        }
      }
    }
  }
}

数据恢复问题,可以设置一个检查点

object Resume {
  def main(args: Array[String]): Unit = {
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("cp", () => {
      //1.初始化 Spark 配置信息
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

      //2.初始化 SparkStreamingContext  采集周期为3秒
      val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

      ssc
    })

    ssc.checkpoint("cp")

    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()

    new Thread(new StopM(ssc)).start()
  }
}

6、spark任务调度机制

6.1、Job & Stage & Task

一个 Spark 应用程序包括 Job、Stage 以及 Task 三个概念:

  1. Job 是以 Action 方法为界,遇到一个 Action 方法则触发一个 Job;
  2. Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 做一次划分;
  3. Task 是 Stage 的子集,以并行度(分区数)来衡量,一个stage中最后一个RDD有多少个分区,则有多少个 task。

6.2、Spark 任务调度概述

Spark 的任务调度总体来说分两路进行,一路是 Stage 级的调度,一路是 Task 级的调度,总体调度流程如下图所示:

在这里插入图片描述


Spark RDD 通过其 Transactions 操作,形成了 RDD 血缘(依赖)关系图,即 DAG,最后通过 Action 的调用,触发 Job 并调度执行,执行过程中会创建两个调度器:DAGScheduler和 TaskScheduler。

  • DAGScheduler 负责 Stage 级的调度,主要是将 job 切分成若干 Stages,并将每个 Stage打包成 TaskSet 交给 TaskScheduler 调度。
  • TaskScheduler 负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend(调度器) 负责提供可用资源,其中SchedulerBackend 有多种实现,分别对接不同的资源管理系统。

6.2.1、Spark Stage 级调度

Spark 的任务调度是从 DAG 切割开始,主要是由 DAGScheduler 来完成。当遇到一个Action 操作后就会触发一个 Job 的计算,并交给 DAGScheduler 来提交,下图是涉及到 Job提交的相关方法调用流程图

在这里插入图片描述

  1. Job 由最终的 RDD 和 Action 方法封装而成;
  2. SparkContext 将 Job 交给 DAGScheduler 提交,它会根据 RDD 的血缘关系构成的 DAG进行切分,将一个 Job 划分为若干 Stages,具体划分策略是,由最终的 RDD 不断通过依赖回溯判断父依赖是否是宽依赖,即以 Shuffle 为界,划分 Stage,窄依赖的 RDD 之间被划分到同一个 Stage 中,可以进行 pipeline 式的计算。
  3. 划分的 Stages 分两类,一类叫做ResultStage,为 DAG 最下游的 Stage,由 Action 方法决定,另一类叫做ShuffleMapStage,为下游 Stage 准备数据,下面看一个简单的例子 WordCount。

    在这里插入图片描述

  • Job 由 saveAsTextFile 触发,该 Job 由 RDD-3 和 saveAsTextFile 方法组成,根据 RDD 之间的依赖关系从 RDD-3 开始回溯搜索,直到没有依赖的 RDD-0,在回溯搜索过程中,RDD3 依赖 RDD-2,并且是宽依赖,所以在 RDD-2 和 RDD-3 之间划分 Stage,RDD-3 被划到最后一个 Stage,即 ResultStage 中,RDD-2 依赖 RDD-1,RDD-1 依赖 RDD-0,这些依赖都是窄依赖,所以将 RDD-0、RDD-1 和 RDD-2 划分到同一个 Stage,形成 pipeline 操作。即ShuffleMapStage 中,实际执行的时候,数据记录会一气呵成地执行 RDD-0 到 RDD-2 的转化。
  • 不难看出,其本质上是一个深度优先搜索(Depth First Search)算法。
    一个 Stage 是否被提交,需要判断它的父 Stage 是否执行,只有在父 Stage 执行完毕才能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。
  • Stage 提交时会将 Task 信息(分区信息以及方法等)序列化并被打包成 TaskSet 交给 TaskScheduler,一个Partition 对应一个 Task
  • 另一方面 TaskScheduler 会监控 Stage 的运行状态,只有 Executor 丢失或者 Task 由于 Fetch 失败才需要重新提交失败的 Stage 以调度运行失败的任务,其他类型的 Task 失败会在 TaskScheduler 的调度过程中重试。

相对来说 DAGScheduler 做的事情较为简单,仅仅是在 Stage 层面上划分 DAG,提交Stage 并监控相关状态信息。TaskScheduler 则相对较为复杂,下面详细阐述其细节。

6.2.2、Spark Task 级调度

Spark Task 的调度是由 TaskScheduler 来完成,由前文可知,DAGScheduler 将 Stage 打包到交给 TaskScheTaskSetduler,TaskScheduler 会将 TaskSet 封装为 TaskSetManager 加入到调度队列中,TaskSetManager 结构如下图所示。

在这里插入图片描述


TaskSetManager 负 责监控 管理 同一 个 Stage 中的 Tasks, TaskScheduler 就是以TaskSetManager 为单元来调度任务

  • 前面也提到,TaskScheduler 初始化后会启动 SchedulerBackend,它负责跟外界打交道,接收 Executor 的注册信息,并维护 Executor 的状态,所以说 SchedulerBackend 是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler 有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余粮,你要不要啊”,TaskScheduler 在 SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择 TaskSetManager 去调度运行,大致方法调用流程如下图所示:

    在这里插入图片描述


    上图中,将 TaskSetManager 加入 rootPool 调度池中之后,调用 SchedulerBackend 的riviveOffers 方法给 driverEndpoint 发送 ReviveOffer 消息;driverEndpoint 收到 ReviveOffer 消息后调用 makeOffers 方法,过滤出活跃状态的 Executor(这些 Executor 都是任务启动时反向注册到 Driver 的 Executor),然后将 Executor 封装成 WorkerOffer 对象;准备好计算资源(WorkerOffer)后,taskScheduler 基于这些资源调用 resourceOffer 在 Executor 上分配 task。

6.2.3、调度策略

  • TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。

1) FIFO 调度策略
如果是采用 FIFO 调度策略,则直接简单地将 TaskSetManager 按照先来先到的方式入队,出队时直接拿出最先进队的 TaskSetManager,其树结构如下图所示,TaskSetManager 保存在一个 FIFO 队列中。

在这里插入图片描述


\2) FAIR 调度策略
FAIR 调度策略的树结构如下图所示:

在这里插入图片描述


FAIR 模式中有一个 rootPool 和多个子 Pool,各个子 Pool 中存储着所有待分配的TaskSetMagager。

在 FAIR 模式中,需要先对子 Pool 进行排序,再对子 Pool 里面的 TaskSetMagager 进行排序,因为 Pool 和 TaskSetMagager 都继承了 Schedulable 特质,因此使用相同的排序算法。

排序过程的比较是基于 Fair-share 来比较的,每个要排序的对象包含三个属性:

  • runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare 值以及 weight 值。

注意,minShare、weight 的值均在公平调度配置文件 fairscheduler.xml 中被指定,调度池在构建阶段会读取此文件的相关配置。

6.2.4、本地化调度

DAGScheduler 切割 Job,划分 Stage, 通过调用 submitStage 来提交一个 Stage 对应的
tasks,submitStage 会调用 submitMissingTasks,submitMissingTasks 确定每个需要计算的 task
的 preferredLocations,通过调用 getPreferrdeLocations()得到 partition 的优先位置,由于一个
partition 对应一个 Task,此 partition 的优先位置就是 task 的优先位置,对于要提交到
TaskScheduler 的 TaskSet 中的每一个 Task,该 task 优先位置与其对应的 partition 对应的优先
位置一致。
从调度队列中拿到 TaskSetManager 后,那么接下来的工作就是 TaskSetManager 按照一定的规则一个个取出 task 给 TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到
Executor 上执行。前面也提到,TaskSetManager 封装了一个 Stage 的所有 Task,并负责管理
调度这些 Task。
根据每个 Task 的优先位置,确定 Task 的 Locality 级别,Locality 一共有五种,优先级
由高到低顺序:

在这里插入图片描述


在调度执行时,Spark 调度总是会尽量让每个 task 以最高的本地性级别来启动,当一个
task 以 X 本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,
此时并不会马上降低本地性级别启动而是在某个时间长度内再次以 X 本地性级别来启动该
task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。
可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的 Executor 可能就会有
相应的资源去执行此 task,这就在在一定程度上提到了运行性能。

6.2.5、失败重试与黑名单机制

除了选择合适的 Task 调度运行外,还需要监控 Task 的执行状态,前面也提到,与外部打交道的是 SchedulerBackend,Task 被提交到 Executor 启动执行后,Executor 会将执行状态上报给 SchedulerBackend,SchedulerBackend 则告诉 TaskScheduler,TaskScheduler 找到该Task 对应的 TaskSetManager,并通知到该 TaskSetManager,这样 TaskSetManager 就知道 Task的失败与成功状态,对于失败的 Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task 池子中,否则整个 Application 失败。

在记录 Task 失败次数过程中,会记录它上一次失败所在的 Executor Id 和 Host,这样下次再调度这个 Task 时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录 Task 上一次失败所在的 Executor Id 和 Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个 Task 了

在这里插入图片描述

6.2.6、应用程序的执行

Task数量和分区的关系

(1)窄依赖
窄依赖数据分区到分区,task个数不变,还是n个task;
窄依赖是 上游对下游是多对一的关系,上游一个分区的数据只会进入下游一个分区;

在这里插入图片描述


(2)宽依赖
宽依赖前后的Task数量会改变,shuffle前task数量等于分区数n,shuffle后task数量等于分区数m,一共n+m个task

在这里插入图片描述

阶段的划分
  • 对于宽依赖来说,下游RDD分区的数据是经过上游RDD各个分区数据打乱重组的,因此,上游RDD必须每个分区的数据都准备好,下游RDD才能进行运算
  • 对于窄依赖来说,分区之间可以相互独立运行,分区1不需要等分区2,因此不需要阶段划分

    在这里插入图片描述


    DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。

    在这里插入图片描述

在这里插入图片描述


阶段之间是需要写磁盘的;

阶段和shuffle有必然的联系
阶段划分源码

(1)从行动算子进去

在这里插入图片描述


一直到

在这里插入图片描述

在这里插入图片描述

(1) 根据触发行动算子的RDD创建ResultStage
(2) 然后由此RDD沿着依赖往前追溯,如果是shuffleDependency,就会创建ShuffleMapStage
(3) 结论就是:
① 阶段数量= shuffle依赖数量+1,这个1其实指的就是ResultStage,因为没有shuffle依赖,也会有ResultStage;
② ResultStage永远只有一个,就是最后需要执行的stage。

RDD 任务划分

四个概念:Application、Job、Stage和Task

  • Application:初始化一个SparkContext即生成一个Application;
  • Job:一个Action算子就会生成一个Job;
  • Stage:Stage等于宽依赖(ShuffleDependency)的个数加1;
  • task

(1)一个Stage阶段中,最后一个RDD的分区个数就是当前Stage的Task的个数。
(2)一个作业(Application)的task个数就是所有stage的task个数总和

Application->Job->Stage->Task每一层都是1对n的关系。

  • 一个Application可以有多个行动算子
  • 一个job有>=0数量的shuffle依赖
  • 一个stage中最后的Rdd有>=1的分区,一个分区对应一个任务
Task的数量

没有阶段划分,任务数量怎么来的
①submitStage(finalStage)
②val missing = getMissingParaentStage(stage)
③submitMissingTasks(stage,jobId.get) 提交没有上一阶段的tasks
④ Seq[Task]:当前阶段中所有的task

在这里插入图片描述


1.Task集合的来源:
匹配当前阶段的类型(说明不同阶段的任务是不一样的)

在这里插入图片描述


在这里插入图片描述


\2. 从上面的截图中可以看出partitionsToCompute的map算子里面创建Task,因为map是一一映射,所以task数量取决于partitionsToCompute

在这里插入图片描述


在这里插入图片描述


结论:
Job.numPartitions来自于阶段中最后一个RDD的分区个数,所以一个阶段中的Task数量=当前阶段中最后一个RDD的分区个数

Task种类的划分

Task的种类和stage是挂钩的
Stage分为ShuffleMapStage和 ResultStage

在这里插入图片描述

任务调度

任务调度就是怎么放怎么取
(1)任务切分后,放置

在这里插入图片描述


在这里插入图片描述

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


(2)任务都放进Pool后,等待取

在这里插入图片描述


在这里插入图片描述

在这里插入图片描述

任务执行

在这里插入图片描述

在这里插入图片描述


在这里插入图片描述

总结

在这里插入图片描述

  • DAGScheduler将每个Stage拆分成Tasks

在这里插入图片描述

7、spark shuffle

https://mp.weixin.qq.com/s/S90onC4sOJ77kwUc4SNvvg

7.1、Shuffle 的核心要点

7.1.1、ShuffleMapStage 与 ResultStage

在这里插入图片描述

  • 在划分 stage 时,最后一个 stage 称为 finalStage,它本质上是一个ResultStage对象,前面的所有 stage 被称为ShuffleMapStage
  • ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘,每个Stage的开始伴随着从磁盘中读取shuffle文件
  • ResultStage 基本上对应代码中的 action 算子,即将一个函数应用在 RDD 的各个 partition的数据集上,意味着一个 job 的运行结束。

shuffle为什么一定会落盘呢?

  • 因为shuffle下游的一个分区会依赖上游多个分区,这样一来,必须得等到上游所有分区执行完毕,下游分区才能获取完整数据进行运算;
  • 如果上游某些分区先运算完毕,在内存中等待,会造成内存挤压!所以必须写到磁盘中进行等待

    在这里插入图片描述

那么除了上游预聚合减少shuffle数据量可以提高shuffle性能以外还有什么方法呢?

7.1.2、Spark Shuffle 历程

Spark Shuffle 分为两种:

  • 一种是基于 Hash 的 Shuffle;
  • 另一种是基于 Sort 的 Shuffle。

先介绍下它们的发展历程,有助于我们更好的理解 Shuffle

发展过程

  • Spark 1.1 之前, Spark 中只实现了一种 Shuffle 方式,即基于 Hash 的 Shuffle 。
  • Spark 1.1 版本中引入了基于 Sort 的 Shuffle 实现方式
  • Spark 1.2 版本之后,默认的实现方式从基于 Hash 的 Shuffle 修改为基于 Sort 的 Shuffle 实现方式,即使用的 ShuffleManager 从默认的 hash 修改为 sort。
  • 在 Spark 2.0 版本中, Hash Shuffle 方式己经不再使用。

HashShuffle

Spark 之所以一开始就提供基于 Hash 的 Shuffle 实现机制,其主要目的之一就是为了避免不需要的排序,大家想下 Hadoop 中的 MapReduce,是将 sort 作为固定步骤,有许多并不需要排序的任务,MapReduce 也会对其进行排序,造成了许多不必要的开销。

在基于 Hash 的 Shuffle 实现方式中,每个 Mapper 阶段的 Task 会为每个 Reduce 阶段的 Task 生成一个文件,通常会产生大量的文件(即对应为 M*R 个中间文件,其中, M 表示 Mapper 阶段的 Task 个数, R 表示 Reduce 阶段的 Task 个数) 伴随大量的随机磁盘 I/O 操作与大量的内存开销。

7.1.3、HashShuffle 解析

未优化的 HashShuffle

https://www.bilibili.com/video/BV11A411L7CK?p=147

这里我们先明确一个假设前提:每个 Executor 只有 1 个 CPU core,也就是说,无论这个 Executor 上分配多少个 task 线程,同一时间都只能执行一个 task 线程。

在这里插入图片描述

下游有 3 个 Reducer,当MapTask运算结束 进入shuffle 需要落盘,如果MapTask只将数据写入一个文件的话,下游ReduceTask 就不知道从文件哪个位置获取属于自己分区的数据;因此MapTask 在落盘的时候,写入ReduceTask个数 个文件中,这样Reducer 会在每个 Task 中把属于自己类别的数据收集过来,进行聚合运算;

总结 未优化的HashShuffle的特点:

  • MapTask 溢写磁盘时 文件个数 = mapTask reduceTask
  • 会产生大量小文件

优化后的 HashShuffle

1.并发MapTask合并机制

  • 优化的 HashShuffle 过程就是启用合并机制
  • 合并机制就是对于同一个CPU Core 并发的MapTask 共用同一个 buffer
  • 开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。
  • 通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。

在这里插入图片描述

优化后的HashShuffle:

  • 同一个Core并发的MapTask共用同一个Buffer
  • MapTask溢写文件个数 = cpu Core 个数 * ReduceTask个数
  • 缓解了未优化的小文件压力,但不彻底
终极优化

在这里插入图片描述

  • 一个core只产生一个文件,附带一个索引文件;
  • 此优化方案是现在用的SortShuffle

7.2、ShuffleStage的读写

shuffleStage分为 ShuffleMapStage和 ShuffleReduceStage
ShuffleMapStage写磁盘, ShuffleReduceStage读取磁盘;

7.2.1、ShuffleManager

其实就是研究上游是如何往磁盘写的,在上面已经图示过了,这边看代码!

在这里插入图片描述


通过ShuffleManager写出

在这里插入图片描述


ShuffleManager中getWriter方法中,会根据不同的Handler获取不同的Writer,因此不同的Handler的写方式是不一样的!那么handler是如何获取的呢?

在这里插入图片描述

针对不同的条件可以获取不同的Handler:

在这里插入图片描述

(1)获取BypassMergeSortShuffleHandler

在这里插入图片描述


在这里插入图片描述


结论:

  1. map端不能有预聚合
  2. 下游分区数,即 reduceTask数量 <= 200(可配置)

(2)获取SerializedShuffleHandler

有序列化的能力,可以将对象序列化后保存到内存;

在这里插入图片描述


使用条件:

  • 序列化规则支持重定位操作(Java序列化不支持,Kryo序列化框架支持)
  • Map端不能有预聚合
  • 下游分区数必须<= 16777216

(3)获取BaseShuffleHandler

其他情况

总结

在这里插入图片描述

7.3、SortShuffle 解析

SortShuffleManager 的运行机制主要分成三种:

  • 普通运行机制;
  • bypass 运行机制
    当 shuffle read task 的数量小于等spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为 200),就会启用 bypass 机制;
  • Tungsten Sort 运行机制
    开启此运行机制需设置配置项 spark.shuffle.manager=tungsten-sort。开启此项配置也不能保证就一定采用此运行机制(后面会解释)。

7.3.1、普通 SortShuffle

普通的SortShuffle过程指的就是 BaseShuffleHandler的SortShuffleWriter 的写磁盘过程

在这里插入图片描述

  • 在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
  • 在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。
    BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。
  • 一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。
  • SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。

7.3.2、bypass SortShuffle

应用场景:

  • Reducer 端任务数比较少的情况下,基于 Hash Shuffle 实现机制明显比基于 Sort Shuffle 实现机制要快,因此基于 Sort Shuffle 实现机制提供了一个带 Hash 风格的回退方案,就是 bypass 运行机制。

使用条件
对于 Reducer 端任务数少于配置spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带 Hash 风格的回退计划。

bypass 运行机制的触发条件

  • shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
  • 不是聚合类的 shuffle 算子。

在这里插入图片描述

过程

  • 每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
  • 该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。
  • 而该机制与普通 SortShuffleManager 运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

SortShuffle和byPassSortShuffle 最终都是形成一个数据文件一个索引文件,只不过方式不同,SortShuffle是通过排序,ByPass是先单独往多个文件写,再将这些文件合并,从而避免排序,因此Bypass会产生大量的中间小文件,所以这也是为什么ByPassSortShuffle的使用要求是ReduceTask数量不能大于200

7.3.3、Tungsten Sort Shuffle 运行机制

Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort 会进行排序,但排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的 GC 的开销。

Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用 SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的:

对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort 方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle 方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。

因此,当设置了 spark.shuffle.manager=tungsten-sort 时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。

要实现 Tungsten Sort Shuffle 机制需要满足以下条件:

Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。

Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。

Shuffle 过程中的输出分区个数少于 16777216 个。

实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。

所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。

8、spark内存管理

8.1、堆内和堆外内存规划

  • Executor 作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。
  • 同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
  • 堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。

    在这里插入图片描述

8.1.1、堆内内存划分

  • 堆 内 内 存 的 大 小 , 由 Spark 应 用 程 序 启 动 时 的 – executor-memoryspark.executor.memory 参数配置。
  • Executor 内运行的并发任务共享 JVM 堆内内存

1.存储内存

  • Executor 内运行的并发任务在缓存 RDD 数据广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存

2.执行(Execution)内存

  • Executor 内运行的并发任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存

3.剩余内存

  • 剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。

4.预留内存

  • 防止OOM用的;预留300M

不同的管理模式下,这三部分占用的空间大小各不相同:

在这里插入图片描述


Spark 对堆内内存的管理是一种逻辑上的”规划式”的管理

如何理解这句话?

答:对象实例占用内存的申请和释放 本质上都是由 JVM 完成的,Spark 只能以引用的方式在申请后和释放前记录这些内存。

我们来看其具体流程:

申请内存流程如下:

  • Spark 在代码中 new 一个对象实例;
  • JVM 从堆内内存分配空间,创建对象并返回对象引用;
  • Spark 保存该对象的引用,记录该对象占用的内存。

释放内存流程如下:

  • Spark 记录该对象释放的内存,删除该对象的引用;
  • 等待 JVM 的垃圾回收机制释放该对象占用的堆内内存。

Spark 并不能准确记录实际可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常。

1.为什么不能准确记录对内内存,避免OOM?

(1)序列化对象可以精准计算
我们知道,JVM 的对象可以以序列化的方式存储,序列化的过程是将对象转换为二进制字节流,本质上可以理解为将非连续空间的链式存储转化为连续空间或块存储,在访问时则需要进行序列化的逆过程——反序列化,将字节流转化为对象,序列化的方式可以节省存储空间,但增加了存储和读取时候的计算开销。
(2)非序列化对象无法精准计算
对于 Spark中序列化的对象,由于是字节流的形式,其占用的内存大小可直接计算,而对于非序列化的对象,其占用的内存是通过周期性地采样近似估算而得,即并不是每次新增的数据项都会计算一次占用的内存大小,这种方法降低了时间开销但是有可能误差较大,导致某一时刻的实际内存有可能远远超出预期。

(3)Spark标记为垃圾的对象,并未被JVM回收
此外,在被 Spark 标记为释放的对象实例,很有可能在实际上并没有被 JVM 回收,导致实际可用的内存小于 Spark 记录的可用内存。

所以 Spark 并不能准确记录实际可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常。

2.Spark的解决方案

虽然不能精准控制堆内内存的申请和释放,但 Spark 通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现。

8.1.2、堆外内存

堆外内存只有两部分:执行内存和存储内存

  • 为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off- heap)内存,使之可以直接在工作节点的系统内存中开辟空间,堆外内存存储内容:存储经过序列化的二进制数据
  • 堆外内存意味着把内存对象分配在 Java 虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。优点1:这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响
  • 利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。
  • 优点2:堆外内存可以被精确地申请和释放
    精准有两个含义
    1.时间上精准
    堆外内存之所以能够被精确的申请和释放,是由于内存的申请和释放不再通过 JVM 机制,而是直接向操作系统申请,JVM 对于内存的清理是无法准确指定时间点的,因此无法实现精确的释放
    2.空间上精准
    堆外内存存储的是序列化的二进制数据,而序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。

如何设置Spark对外内存?

  • 默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled参数启用
  • spark.memory.offHeap.size参数设定堆外空间的大小。

除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

8.2、内存空间分配

8.2.1、静态内存管理

静态内存管理的特点:

在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置


堆内内存的分配如图所示:

在这里插入图片描述


可用的存储内存:
systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction

可用的执行内存:
systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

  • systemMaxMemory 取决于当前 JVM 堆内内存的大小
  • safetyFraction 其意义在于在逻辑上预留出1-safetyFraction 这么一块保险区域,降低由实际内存超出当前预设范围而导致 OOM 的风险(上文提到,对于非序列化对象的内存采样估算会产生误差)。值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和”其它内存”一样交给了 JVM 去管理.
  • Storage 内存和 Execution 内存都有预留空间,目的是防止 OOM,因为 Spark 堆内内存大小的记录是不准确的,需要留出保险区域。

堆外内存的分配如图所示:

在这里插入图片描述

  • 堆外的空间分配较为简单,只有存储内存和执行内存
  • 可用的执行内存和存储内存占用的空间大小直接由参数 spark.memory.storageFraction决定
  • 由于堆外内存占用的空间可以被精确计算,所以无需再设定保险区域

8.2.2、统一内存管理

Spark1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域.

(1)统一内存管理的堆内内存结构如图所示:

在这里插入图片描述


虚线表示不固定,可以移动

(2)统一内存管理的堆外内存结构如下图所示:

在这里插入图片描述

其中最重要的优化在于动态占用机制,其规则如下:

  • 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围;
  • 双方的空间都不足时,则存储到硬盘
  • 若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
  • 当存储内存占用执行内存的时候,可让存储内存占用的部分转存到硬盘,然后”归还”借用的空间;
  • 当执行内存占用存储内存的时候,无法让执行内存”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

统一内存管理的动态占用机制如图所示:

在这里插入图片描述


淘汰机制和RDD缓存级别有关,如果cache是Memory only,那么这部分数据不能写入磁盘,只能被删除;这意味着在正常情况下,RDD的数据都会有可能丢失,所以在RDD cache过程中是不能切断血缘关系的;

执行内存不能丢失,所以执行内存是霸道总裁有借无还!

9、spark面试题

  1. Spark数据倾斜

    【Spark 常见问题】Spark数据倾斜

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340