Spark系列之Spark的RDD详解


title: Spark系列


第五章 Spark 的RDD详解

5.1 RDD概述

​ RDD 是 Spark 的基石,是实现 Spark 数据处理的核心抽象。那么 RDD 为什么会产生呢?

​ Hadoop的MapReduce是一种基于数据集的工作模式,面向数据,这种工作模式一般是从存储上加载数 据集,然后操作数据集,最后写入物理存储设备。数据更多面临的是一次性处理。

​ MapReduce的这种方式对数据领域两种常见的操作不是很高效。第一种是迭代式的算法。比如机器学习中ALS、凸优化梯度下降等。这些都需要基于数据集或者数据集的衍生数据反复查询反复操作。 MapReduce这种模式不太合适,即使多MapReduce串行处理,性能和时间也是一个问题。数据的共享 依赖于磁盘。另外一种是交互式数据挖掘,MapReduce显然不擅长。

MapReduce中的迭代

在这里插入图片描述

Spark中的迭代

在这里插入图片描述

​ 我们需要一个效率非常快,且能够支持迭代计算和有效数据共享的模型,Spark应运而生。RDD是基于 工作集的工作模式,更多的是面向工作流。

​ 但是无论是MapReduce还是RDD都应该具有类似位置感知、容错和负载均衡等特性。

5.2 什么是RDD

RDD(Resilient Distributed Dataset),叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建RDD、转化已有 RDD 以及调用RDD操作进行求值。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD可以包含Python、Java、Scala中任意类型的对象,甚至可以包含用户自定义的对象。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD 允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

​ RDD 支持两种操作:转化Transformation操作 和 行动Action操作。RDD 的Transformation操作是返回一个新的 RDD 的操作,比如 map() 和 filter(),而Action操作则是向驱动器程序返回结果或把结果写入外部系统的操作。比如 count() 和 first()。

​ Spark 采用惰性计算模式,RDD只有第一次在一个行动操作中用到时,才会真正计算。Spark可以优化 整个计算过程。默认情况下,Spark的RDD会在你每次对它们进行行动操作时重新计算。如果想在多个 行动操作中重用同一个RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。

​ 可以从三个方面来理解:

1、只读数据集DataSet:故名思议,RDD是数据集合的抽象,是复杂物理介质上存在数据的一种逻辑 视图。从外部来看,RDD的确可以被看待成经过封装,带扩展特性(如容错性)的数据集合。RDD是只 读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的 RDD。由一个RDD转换到另一个 RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写 map 和 reduce 了

在这里插入图片描述

2、分布式Distributed/分区:RDD的数据可能在物理上存储在多个节点的磁盘或内存中,也就是所谓 的多级存储。RDD 逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个 compute 函数得到每个分区的数据。如果 RDD 是通过已有的文件系统构建,则 compute 函数是读取指定文件系 统中的数据,如果 RDD 是通过其他 RDD 转换而来,则 compute 函数是执行转换逻辑将其他 RDD 的 数据进行转换。

在这里插入图片描述

3、弹性Resilient:虽然 RDD 内部存储的数据是只读的,但是,我们可以去修改(例如通过 repartition 转换操作)并行计算单元的划分结构,也就是分区的数量。

Spark的RDD的弹性:

存储的弹性:内存和磁盘的自动切换
容错的弹性:数据丢失可以自动恢复
计算的弹性:计算出错重试机制
分片的弹性:根据需要重新分片
1、自动进行内存和磁盘数据存储的切换
Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换。

2、基于血统的高效容错机制
在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。

3、Task如果失败会自动进行特定次数的重试
RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。

4、Stage如果失败会自动进行特定次数的重试
如果Job某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。

5、Checkpoint和Persist可主动或被动触发
RDD可以通过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的所有父RDD依赖都会被移除。

6、数据调度弹性
Spark把这个Job执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。

7、数据分片的高度弹性
可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。

总结:

	RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建,为此,RDD支持丰富的转换操作(如map,join,filter,groupby等),通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储。另外RDD 还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说Spark 最初也就是实现 RDD 的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD 的关系类似于 Hadoop-MapReduce 关系。

5.3 RDD属性

在这里插入图片描述

1、A list of partitions:一组分片(Partition),即数据集的基本组成单位

1、一个分区通常与一个计算任务关联,分区的个数决定了并行的粒度;
2、分区的个数可以在创建RDD的时候进行设置。如果没有设置的话,默认情况下由节点的cores个数决定;
3、每个Partition最终会被逻辑映射为BlockManager中的一个Block,而这个Block会被下一个Task(ShuffleMapTask/ResultTask)使用进行计算

2、A function for computing each split:一个计算每个分区的函数,也就是算子

分区处理函数compute
1、每个RDD都会实现compute,用于对分区进行计算;
2、compute函数会对迭代器进行复合,不需要保存每次计算结果;
3、该方法负责接收parent RDDs或者data block流入的records并进行计算,然后输出加工后的records。

3、A list of dependencies on other RDDs:RDD之间的依赖关系:宽依赖和窄依赖

	RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

	RDDx依赖的parent RDD的个数由不同的转换操作决定,例如二元转换操作x = a.join(b),RDD x就会同时依赖于RDD a和RDD b。而具体的依赖关系可以细分为完全依赖和部分依赖,详细说明如下:
1、完全依赖:一个子RDD中的分区可以依赖于父RDD分区中一个或多个完整分区。
例如,map操作产生的子RDD分区与父RDD分区之间是一对一的关系;对于cartesian操作产生的子RDD分区与父RDD分区之间是多对多的关系。
2、部分依赖:父RDD的一个partition中的部分数据与RDD x的一个partition相关,而另一部分数据与RDD x中的另一个partition有关。
例如,groupByKey操作产生的ShuffledRDD中的每个分区依赖于父RDD的所有分区中的部分元素。

在这里插入图片描述

	在Spark中,完全依赖是NarrowDependency(黑色箭头),部分依赖是ShuffleDependency(红色箭头),而NarrowDependency又可以细分为[1:1]OneToOneDependency、[N:1]NarrowDependency和[N:N]NarrowDependency,还有特殊的RangeDependency (只在 UnionRDD中使用)。

	需要注意的是,对于[N:N]NarrowDependency很少见,最后生成的依赖图和ShuffleDependency没什么两样。只是对于父RDD来说,有一部分是完全依赖,有一部分是部分依赖。所以也只有[1:1]OneToOneDependency和[N:1]NarrowDependency两种情况。

4、Optionally,a Partitioner for key-value RDDs (e.g. to say that the RDD is hashpartitioned):一个Partitioner,即RDD的分片函数。

当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

1、只有键值对RDD,才会有Partitioner。其他非键值对的RDD的Partitioner为None;
2、它定义了键值对RDD中的元素如何被键分区,能够将每个键映射到对应的分区ID,从0到”numPartitions-1”上;
3、Partitioner不但决定了RDD本身的分区个数,也决定了parent RDD shuffle输出的分区个数。
4、在分区器的选择上,默认情况下,如果有一组RDDs(父RDD)已经有了Partitioner,则从中选择一个分区数较大的Partitioner;否则,使用默认的HashPartitioner。
5、对于HashPartitioner分区数的设置,如果配置了spark.default.parallelism属性,则将分区数设置为此值,否则,将分区数设置为上游RDDs中最大分区数。

5、Optionally,a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):一个列表,存储存取每个Partition的优先位置(preferred location)。

1、对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。
2、按照”移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
3、每个子RDDgetPreferredLocations的实现中,都会优先选择父RDD中对应分区的preferedLocation,其次才选择自己设置的优先位置。

在代码中的表现:

// 由子类实现以计算给定分区
def compute(split: Partition,context: TaskContext): Iterator[T]

// 获取所有分区
protected def getPartitions: Array[Partition]

// 获取所有依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps

// 获取优先位置列表
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

// 分区器 由子类重写以指定它们的分区方式
@transient val partitioner: Option[Partitioner] = None

5.4 创建RDD

创建RDD主要有两种方式:官网解释

There are two ways to create RDDs: parallelizing an existing collection in your driver program,or referencing a dataset in an external storage system,such as a shared filesystem,HDFS,HBase,or any data source offering a Hadoop InputFormat.

在这里插入图片描述

1、由一个已经存在的Scala数据集合创建
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
val rdd = sc.makeRDD(Array(1,8))

2、由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、
Cassandra、HBase等:val rdd =
sc.textFile("hdfs://hadoop10/spark/wc/input/words.txt")

3、从其他RDD转化来

5.5 RDD的编程API

官网链接: http://spark.apache.org/docs/latest/rdd-programming-guide.html

RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD 的血缘关 系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件 系统中。

下图是RDD所支持的操作算子列表。

在这里插入图片描述

5.5.1 Transformations

官网: http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

RDD中的所有转换(Transformation)都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

常用的Transformation:

Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func) Similar to map,but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map,but runs separately on each partition (block) of the RDD,so func must be of type Iterator => Iterator when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions,but also provides func with an integer value representing the index of the partition,so func must be of type (Int,Iterator) => Iterator when running on an RDD of type T.
sample(withReplacement,fraction,seed) Sample a fraction fraction of the data,with or without replacement,using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numPartitions])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numPartitions]) When called on a dataset of (K,V) pairs,returns a dataset of (K,Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key,using reduceByKey or aggregateByKey will yield much better performance. Note: By default,the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.
reduceByKey(func,[numPartitions]) When called on a dataset of (K,V) pairs where the values for each key are aggregated using the given reduce function func,which must be of type (V,V) => V. Like in groupByKey,the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp,combOp,U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type,while avoiding unnecessary allocations. Like in groupByKey,the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending],V) pairs where K implements Ordered,V) pairs sorted by keys in ascending or descending order,as specified in the boolean ascending argument.
join(otherDataset,[numPartitions]) When called on datasets of type (K,V) and (K,W),(V,W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin,and fullOuterJoin.
cogroup(otherDataset,(Iterable,Iterable)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U,returns a dataset of (T,U) pairs (all pairs of elements).
pipe(command,[envVars]) Pipe each partition of the RDD through a shell command,e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and,within each resulting partition,sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

总结:

Transformation返回值还是一个RDD。它使用了链式调用的设计模式,对一个RDD进行计算后,变换成另外
一个RDD,然后这个RDD又可以进行另外一次转换。这个过程是分布式的。

5.5.2 Actions

官网:http://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

常用的 Action:

Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.
takeSample(withReplacement,num,[seed]) Return an array with a random sample of num elements of the dataset,optionally pre-specifying a random number generator seed.
takeOrdered(n,[ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem,HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path) (Java and Scala) Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem,HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop’s Writable interface. In Scala,it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int,Double,String,etc).
saveAsObjectFile(path) (Java and Scala) Write the elements of the dataset in a simple format using Java serialization,which can then be loaded using SparkContext.objectFile().
countByKey() Only available on RDDs of type (K,V). Returns a hashmap of (K,Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

总结:

Action返回值不是一个RDD。它要么是一个Scala的普通集合,要么是一个值,要么是空,最终或返回到Driver程序,或把RDD写入到文件系统中

5.6 WordCount中的RDD

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

5.7 RDD算子练习

启动spark-shell

$SPARK_HOME/bin/spark-shell --master spark://hadoop10:7077

练习1:

//通过并行化生成rdd
val rdd1 = sc.parallelize(List(5,8,9,1,10))
//对rdd1里的每一个元素乘2然后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x,true)
//过滤出大于等于十的元素
val rdd3 = rdd2.filter(_ >= 10)
//将元素以数组的方式在客户端显示
rdd3.collect

练习2:

val rdd1 = sc.parallelize(Array("a b c","d e f","h i j"))
//将rdd1里面的每一个元素先切分在压平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect

练习3:

val rdd1 = sc.parallelize(List(5,3))
val rdd2 = sc.parallelize(List(1,4))
//求并集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect

练习4:

val rdd1 = sc.parallelize(List(("huangbo",1),("xuzheng",3),("wangbaoqiang",2)))
val rdd2 = sc.parallelize(List(("liuyifei",2),("liutao",("liushishi",2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
rdd4.groupByKey
rdd4.collect

练习5:

val rdd1 = sc.parallelize(List(("huangbo",("shenteng",2)))
val rdd2 = sc.parallelize(List(("huangbo",33),("huangbo",44),11),22)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup与groupByKey的区别
rdd3.collect

练习6

val rdd1 = sc.parallelize(List(1,5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect

练习7:

val rdd1 = sc.parallelize(List(("tom",("jerry",("kitty",("shuke",1)))
val rdd2 = sc.parallelize(List(("jerry",("tom",5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2,t._1)).sortByKey(false).map(t => (t._2,t._1))
rdd5.collect

可以参考下面的链接: http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

5.8 RDD的依赖关系

​ RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维 护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖(narrow dependency),RDDs之间分区是一一对应的,另一种是宽依赖(wide dependency),下游RDD的每个 分区与上游 RDD (也称之为父RDD)的每个分区都有关,是多对多的关系。

在这里插入图片描述

在这里插入图片描述

窄依赖和宽依赖对比:

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
总结:窄依赖我们形象的比喻为独生子女,窄依赖的函数有:map,union,join(父RDD是hash-partitioned ),mapPartitions,mapValues

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition
总结:窄依赖我们形象的比喻为多生,宽依赖的函数有:groupByKey、partitionBy、reduceByKey、sortByKey、join(父RDD不是hash-partitioned)

​ 窄依赖和宽依赖总结:

	在这里我们是从父RDD的partition被使用的个数来定义窄依赖和宽依赖,因此可以用一句话概括下:如果父RDD的一个Partition被子RDD的一个Partition所使用就是窄依赖,否则的话就是宽依赖。因为是确定的partition数量的依赖关系,所以RDD之间的依赖关系就是窄依赖;由此我们可以得出一个推论:即窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖。

	一对固定个数的窄依赖的理解:即子RDD的partition对父RDD依赖的Partition的数量不会随着RDD数据规模的改变而改变;换句话说,无论是有100T的数据量还是1P的数据量,在窄依赖中,子RDD所依赖的父RDD的partition的个数是确定的,而宽依赖是shuffle级别的,数据量越大,那么子RDD所依赖的父RDD的个数就越多,从而子RDD所依赖的父RDD的partition的个数也会变得越来越多。

5.9 Lineage血统

​ RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记 录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分 分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

5.10 DAG生成

	DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

	对于窄依赖:由于分区的依赖关系是确定的,其转换操作可以在同一个线程执行,所以可以划分到同一个执行阶段;
	对于宽依赖:由于Shuffle的存在,只能在父RDD(s)被Shuffle处理完成后,才能开始接下来的计算,因此遇到宽依赖就需要重新划分阶段。

在这里插入图片描述

在这里插入图片描述

​ 在spark中,会根据RDD之间的依赖关系将DAG图(有向无环图)划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。

​ 首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)对父分区数据进行计算,例如先执行map操作,然后执行filter操作。而宽依赖则需要计算好所有父分区的数据,然后再在节点之间进行Shuffle,这与MapReduce类似。窄依赖能够更有效地进行数据恢复,因为只需重新对丢失分区的父分区进行计算,且不同节点之间可以并行计算;而对于宽依赖而言,如果数据丢失,则需要对所有父分区数据进行计算并再次Shuffle。

​ 因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。因此在上图中RDD C,RDD D,RDD E,RDD F被构建在一个stage中,RDD A被构建在一个单独的Stage中,而RDD B和RDD G又被构建在同一个stage中。

​ 在spark中,Task的类型分为2种:ShuffleMapTask和ResultTask

​ 简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中;也就是说上图中的stage1和stage2相当于MapReduce中的Mapper,而ResultTask所代表的stage3就相当于MapReduce中的reducer。

​ 在之前动手操作了一个WordCount程序,因此可知,Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不过区别在于:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根据Key进行reduce,但spark除了这两个算子还有其他的算子;因此从这个意义上来说,Spark比Hadoop的计算算子更为丰富。

5.11 RDD缓存

​ Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

​ 如果一个有持久化数据的节点发生故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。

​ 持久化也是懒执行的,持久化有两个操作:persist(StorageLevel),另外一个是cache,cache就相当于MEMORY_ONLY的persist。

5.11.1 RDD缓存方式

​ RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

在这里插入图片描述

​ 通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

在这里插入图片描述

​ 缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

5.12 Checkpoint

​ Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。

​ cache和checkpoint是有显著区别的,缓存把RDD计算出来然后放在内存中,但是RDD 的依赖链(相当于数据库中的redo日志),也不能丢掉,当某个点某个executor宕了,上面 cache的RDD就会丢掉,需要通过依赖链重放计算出来,不同的是,checkpoint是把RDD保存在HDFS中,是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链,是通过复制实现的高容错。

如果存在以下场景,则比较适合使用检查点机制:

1、DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)。
2、在宽依赖上做checkpoint获得的收益更大。

​ 为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

5.13 Shared Variables(共享变量)

在Spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:

**广播变量(Broadcast Variable)累加器(**Accumulator)

官网:http://spark.apache.org/docs/latest/rdd-programming-guide.html#shared-variables

5.13.1 Broadcast Variables(广播变量)

5.13.1.1 为什么要定义广播变量

​ 如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么知识每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

没有使用广播变量:

在这里插入图片描述

使用了广播变量之后:

在这里插入图片描述

5.13.1.2 如何定义和还原一个广播变量

定义:

val a = 3 val broadcast = sc.broadcast(a)

还原:

val c = broadcast.value

注意:变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改

5.13.1.3 注意事项

1、能不能将一个RDD使用广播变量广播出去?
	不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。

2、广播变量只能在Driver端定义,不能在Executor端定义。

3、在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。

5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中都只有一份Driver端的变量副本。

5.13.2 Accumulators(累加器)

5.13.2.1 为什么要定义累加器

​ 在Spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会在driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

5.13.2.2 图解累加器

错误的图解:

在这里插入图片描述

正确的图解:

在这里插入图片描述

5.13.2.3 如果定义和还原一个累加器

定义累加器:

val acc = sc.longAccumulator(“myacc”)

还原累加器:

val value = acc.value

5.13.2.4 注意事项

1、累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

2、累加器不是一个调优的操作,因为如果不这样做,结果是错的


声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

原文地址:https://blog.csdn.net/luoyepiaoxue2014/article/details/128076389

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。