Spark4-RDD使用

如何创建RDD

1.创建方式

1.parallelizing an existing collection in your driver program

通过并行化存在的一个集合,将集合转换成RDD

2.referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

使用外部的存储系统,外部文件系统包括hdfs hbase

2.RDD创建

2.1集合(数组)方式转换

scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> distData.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5)

在webUI界面中我们可以看到task的执行数量,task的数量可以在spark/conf目录下spark-default.conf文件设置

spark.default.parallelism 24

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster.

val distData = sc.parallelize(data,5)

在并行化操作时,会将数据集拆分成partition的数量,例如指定5,webUI界面可以看到task的数量也是5,所以在spark中,partition的数量 = task的数量.

2.2外部数据集

使用sc.textFile(“path”)指定本地文件数据源

scala> val testData = sc.textFile("file:///opt/scripts/check_yarn.txt")
testData: org.apache.spark.rdd.RDD[String] = file:///opt/scripts/check_yarn.txt MapPartitionsRDD[10] at textFile at <console>:24

scala> testData.collect
res6: Array[String] = Array(lyhU51BaseWebBankStg, lyhClientContactStg, lyhNfcsStg, lyhU51AdditionAppStg, lyhApplicationStg, lyhU51BaseOperatorStg, lyhU51AdditionInfoStg, lyhAppCrossStg, lyhMoxieCarrierStg)

使用sc.textFile(“path”)指定hdfs数据源

scala> val testData = sc.textFile("hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs/check_yarn.txt")
testData: org.apache.spark.rdd.RDD[String] = hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs/check_yarn.txt MapPartitionsRDD[18] at textFile at <console>:24

scala> testData.collect
res11: Array[String] = Array(lyhU51BaseWebBankStg, lyhClientContactStg, lyhNfcsStg, lyhU51AdditionAppStg, lyhApplicationStg, lyhU51BaseOperatorStg, lyhU51AdditionInfoStg, lyhAppCrossStg, lyhMoxieCarrierStg)

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

如果是指定文件夹的话,在写路径的时候可以只写到文件夹目录,同时也可以指定特定文件

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

textFile minPartition:在textFile()的文件路径后面可以使用另外一个可选参数,可以控制partition的数量,默认一个block创建一个partition,也可以通过设置一个更大的值来设置partition的大小和数量.

wholeTextFiles

scala> val testData = sc.wholeTextFiles("hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs")
testData: org.apache.spark.rdd.RDD[(String, String)] = hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs MapPartitionsRDD[20] at wholeTextFiles at <console>:24

scala> testData.collect
res12: Array[(String, String)] =
Array((hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs/check_yarn.txt,"lyhU51BaseWebBankStg
lyhClientContactStg
lyhNfcsStg
lyhU51AdditionAppStg
lyhApplicationStg
lyhU51BaseOperatorStg
lyhU51AdditionInfoStg
lyhAppCrossStg
lyhMoxieCarrierStg
"))

跟上面的textFile()不同的是,使用wholeTextFiles()既可以返回一个键值对(filename,content),可以看到文件的目录名称以及内容.

2.3RDD文件保存

scala> testData.saveAsTextFile("hdfs://stg.bihdp01.hairongyi.local:8020/user/hdfs/testData")
#然后在hdfs对应的目录下能看见我们保存的文件
# hadoop fs -ls /user/hdfs/testData
Found 3 items
-rw-r--r--   2 root supergroup          0 2019-05-06 17:23 /user/hdfs/testData/_SUCCESS
-rw-r--r--   2 root supergroup         91 2019-05-06 17:23 /user/hdfs/testData/part-00000
-rw-r--r--   2 root supergroup         78 2019-05-06 17:23 /user/hdfs/testData/part-00001

# hadoop fs -text /user/hdfs/testData/*
lyhU51BaseWebBankStg
lyhClientContactStg
lyhNfcsStg
lyhU51AdditionAppStg
lyhApplicationStg
lyhU51BaseOperatorStg
lyhU51AdditionInfoStg
lyhAppCrossStg
lyhMoxieCarrierStg

可以看到我们的结果是保存进去了,但是最后生成了两个结果文件,一般而言,saveAsTextFile会按照执行task的多少生成多少个文件,比如part-00000一直到part-0000n,n自然就是task的个数,亦即是最后的stage的分区数。在RDD上调用 coalesce(1,true).saveAsTextFile()又或者,可以调用repartition(1)这两个方法来使我们最后的结果输出到一个文件中.

tips:如果机器的内存不够大,但是数据很大,难以在单机内存上装下,以上操作可能会造成单机内存不足。

原文地址:https://blog.csdn.net/qq_31405633/article/details/89887783

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 TwemojiTwemoji是Twitter开源的其完整的Emoji表情图片。开发者可以去GitHub下载完整的表情库,并把这些表情加入到自己的应用或网页中。使用示例:var i = 0;twemoji.parse(  ’emoji, m\u276
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即两个对象在内存中的首地址;equals比较字符串中所包含的内容是否相同。publicstaticvoidmain(String[]args){​ Strings1="abc"; Strings2=newString("abc");​ System.out.println(s1==s2)
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala怎么使用”吧!语法scala...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相信很多人在Scala是一种什么语言问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相信很多人在Scala Trait怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日常操作中,相信很多人在Scala类型检查与模式匹配怎么使用问题上存在疑惑,小编查阅了各式资料,整理...
这篇文章主要介绍“scala中常用但不常见的符号有哪些”,在日常操作中,相信很多人在scala中常用但不常见的符号有哪些问题上存在疑惑,小编查阅了各式资料,整理...
本篇内容主要讲解“Scala基础知识有哪些”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala基础知识有哪些”...
本篇内容介绍了“scala基础知识点有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧...
本篇内容介绍了“Scala下划线怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧...
本篇内容主要讲解“Scala提取器怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala提取器怎么使用”...
这篇文章主要讲解了“Scala基础语法有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Scala基础语法有...
本篇内容主要讲解“Scala方法与函数怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala方法与函数怎...
这篇文章主要讲解了“scala条件控制与循环怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“scala条...
这篇文章主要介绍“scala函数怎么定义和调用”,在日常操作中,相信很多人在scala函数怎么定义和调用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操
这篇文章主要介绍“scala如何声明变量”,在日常操作中,相信很多人在scala如何声明变量问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对...
这篇文章主要讲解了“scala的Map和Tuple怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“scala的Ma...
这篇文章主要介绍“scala的隐式参数有什么作用”,在日常操作中,相信很多人在scala的隐式参数有什么作用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的...
本篇内容主要讲解“Scala怎么进行文件写操作”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala怎么进行文件...
这篇文章主要讲解了“Scala怎么声明数组”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Scala怎么声明数组...