如何解决如何处理分隔符是否出现在Spark rdd的数据中
使用spark RDD加载文件时,如何处理数据中是否存在分隔符。
我的数据如下:
NAME|AGE|DEP
Suresh|32|BSC
"Sathish|Kannan"|30|BE
如何将此列转换为如下所示的3列。
NAME AGE DEP
suresh 32 Bsc
Sathish|Kannan 30 BE
请参阅我如何尝试加载数据。
scala> val rdd = sc.textFile("file:///test/Sample_dep_20.txt",2)
rdd: org.apache.spark.rdd.RDD[String] = hdfs://Hive/Sample_dep_20.txt MapPartitionsRDD[1] at textFile at <console>:27
rdd.collect.foreach(println)
101|"Sathish|Kannan"|BSC
102|Suresh|DEP
scala> val rdd2=rdd.map(x=>x.split("\""))
rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:29
scala> val rdd3=rdd2.map(x=>
| {
| var strarr = scala.collection.mutable.ArrayBuffer[String]()
| for(v<-x)
| {
| if(v.startsWith("\"") && v.endsWith("\""))
| strarr +=v.replace("\"","")
| else if(v.contains(","))
| strarr ++=v.split(",")
| else
| strarr +=v
| }
| strarr
| }
| )
rdd3: org.apache.spark.rdd.RDD[scala.collection.mutable.ArrayBuffer[String]] = MapPartitionsRDD[3] at map at <console>:31
scala> rdd3.collect.foreach(println)
ArrayBuffer(101|,Sathish|Kannan,|BSC)
ArrayBuffer(102|Suresh|DEP)
解决方法
也许您需要将"
明确定义为引号字符(csv
阅读器默认情况下是引号字符,但在您的情况下可能不会吗?)。因此,在读取.csv文件时,在选项中添加.option("quote","\"")
应该可以。
scala> val inputds = Seq("Suresh|32|BSC","\"Satish|Kannan\"|30|BE").toDS()
inputds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val outputdf = spark.read.option("header",false).option("delimiter","|").option("quote","\"").csv(inputds)
outputdf: org.apache.spark.sql.DataFrame = [_c0: string,_c1: string ... 1 more field]
scala> outputdf.show(false)
+-------------+---+---+
|_c0 |_c1|_c2|
+-------------+---+---+
|Suresh |32 |BSC|
|Satish|Kannan|30 |BE |
+-------------+---+---+
定义使DataFrameReader
忽略带引号的字符串中的分隔符,请参见Spark API文档here。
编辑
如果您想努力工作并且仍然使用普通的RDD,请尝试像这样修改split()
函数:
val rdd2=rdd.map(x=>x.split("\\|(?=([^\"]*\"[^\"]*\")*[^\"]*$)"))
它使用正向预见来忽略引号内的|
分隔符,并避免您在第二个.map
中进行字符串操作。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。