Spark中foreachRDD、foreachPartition和foreach的区别是什么

这篇文章主要介绍“Spark中foreachRDD、foreachPartition和foreach的区别是什么”,在日常操作中,相信很多人在Spark中foreachRDD、foreachPartition和foreach的区别是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark中foreachRDD、foreachPartition和foreach的区别是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

区别

最近有不少同学问我,Spark 中 foreachRDD、foreachPartition和foreach 的区别,工作中经常会用错或不知道怎么用,今天简单聊聊它们之间的区别:

其实区别它们很简单,首先是作用范围不同,foreachRDD 作用于 DStream中每一个时间间隔的 RDD,foreachPartition 作用于每一个时间间隔的RDD中的每一个 partition,foreach 作用于每一个时间间隔的 RDD 中的每一个元素。

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

SparkStreaming 中对 foreachRDD的说明。

foreach 与 foreachPartition都是在每个partition中对iterator进行操作,不同的是,foreach是直接在每个partition中直接对iterator执行foreach操作,而传入的function只是在foreach内部使用,而foreachPartition是在每个partition中把iterator给传入的function,让function自己对iterator进行处理(可以避免内存溢出)

一个简单的例子

在Spark 官网中,foreachRDD被划分到Output Operations on DStreams中,所有我们首先要明确的是,它是一个输出操作的算子,然后再来看官网对它的含义解释:The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

最常用的输出操作,需要一个函数作为参数,函数作用于DStream中的每一个RDD,函数将RDD中的数据输出到外部系统,如文件、数据库,在driver上执行

函数中通常要有action算子,因为foreachRDD本身是transform算子

官网还给出了开发者常见的错误:

Often writing data to external system requires creating a connection object (e.g. TCP connection to a remote server) and using it to send data to a remote system. For this purpose, a developer may inadvertently try creating a connection object at the Spark driver, and then try to use it in a Spark worker to save records in the RDDs. For example :(中文解析见代码下方)

// ① 这种写法是错误的 ❌dstream.foreachRDD { rdd =>  val connection = createNewConnection()  // executed at the driver  rdd.foreach { record =>    connection.send(record) // executed at the worker  }}

上面说的是我们使用foreachRDD向外部系统输出数据时,通常要创建一个连接对象,如果像上面的代码中创建在 driver 上就是错误的,因为foreach在每个节点上执行时节点上并没有连接对象。driver节点就一个,而worker节点有多个。

所以,我们改成下面这样:

// ② 把创建连接写在 forech 里面,RDD 中的每个元素都会创建一个连接dstream.foreachRDD { rdd =>  rdd.foreach { record =>    val connection = createNewConnection() // executed at the worker    connection.send(record) // executed at the worker    connection.close()  }}

这时不会出现计算节点没有连接对象的情况。但是,这样写会在每次循环RDD的时候都会创建一个连接,创建连接和关闭连接都很频繁,造成系统不必要的开销。

可以通过使用 foreachPartirion 来解决这类问题:

// ③ 使用foreachPartitoin来减少连接的创建,RDD的每个partition创建一个链接dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()  }}

上面这种方式还可以优化,虽然连接申请变少了,但是对一每一个partition来说,连接还是没有办法复用,所以我们可以引入静态连接池。官方说明:该连接池必须是静态的、懒加载的。

// ④ 使用静态连接池,可以增加连接的复用、减少连接的创建和关闭。dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    // ConnectionPool is a static, lazily initialized pool of connections    val connection = ConnectionPool.getConnection()    partitionOfRecords.foreach(record => connection.send(record))    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  }}

这里需要注意的是:使用连接池中的连接应按需创建,如果有一段时间不使用,则应超时,这样实现了向外部系统最有效地发送地数据。

到此,关于“Spark中foreachRDD、foreachPartition和foreach的区别是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程之家网站,小编会继续努力为大家带来更多实用的文章!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


1.SparkStreaming是什么?SparkStreaming是SparkCore的扩展API用来支持高吞吐、高容错的处理流式数据数据源可以是:Kafka、TCPsockets、Flume、Twitter等流式数据源处理数据:可以用SparkCore的算子map、reduce、join、window
本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这...
本篇文章给大家分享的是有关如何进行Spark数据分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说...
本篇内容主要讲解“Spark Shuffle和Hadoop Shuffle有哪些区别”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“S...
这篇文章主要介绍“TSDB的数据怎么利用Hadoop/spark集群做数据分析”,在日常操作中,相信很多人在TSDB的数据怎么利用Hadoop/spark集群做数据分析问题上存在疑惑...
本篇内容介绍了“Hadoop与Spark性能原理是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这
小编给大家分享一下Hadoop和Spark有什么不同,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们
这篇文章主要讲解了“Hadoop和Spark的Shuffle过程有什么不同”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习...
本篇文章给大家分享的是有关基于CDP7.1.1的Spark3.0技术预览版本分析是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获...
这篇文章主要介绍“Spark中foreachRDD、foreachPartition和foreach的区别是什么”,在日常操作中,相信很多人在Spark中foreachRDD、foreachPartition和foreach的...
本篇内容主要讲解“spark的动态分区裁剪怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spark的动态分...
本篇内容介绍了“spark的动态分区裁剪下物理计划怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下
这篇文章给大家介绍基于Spark和TensorFlow 的机器学习实践是怎么样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。EMR E-Learning平台...
这篇文章将为大家详细讲解有关如何进行EMR Spark-SQL性能极致优化的分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识...
如何进行SparkSQL与Hive metastore Parquet转换的分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决...
如何浅析Hive和Spark SQL读文件时的输入任务划分,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个...
这篇文章将为大家详细讲解有关Hive on Spark参数如何调优,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。前言Hive on Spa...
这篇文章将为大家详细讲解有关fs.defaultFS变更使spark-sql查询hive失败是怎么回事,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以...
这篇文章将为大家详细讲解有关怎么解析SparkCore和SparkSQL,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解...
怎么快速搭建Spark开发环境,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一,搭建本...