如何解决在 Scala 中使用 Spark 写入 JSON 格式之前,在每行前面添加一个新行
我想在 Spark 将它写入我的 s3 存储桶之前在我的每个 json 文档前面添加一个新行:
df.createOrReplaceTempView("ParquetTable")
val parkSQL = spark.sql("select LAST_MODIFIED_BY,LAST_MODIFIED_DATE,NVL(CLASS_NAME,className) as CLASS_NAME,DECISION,TASK_TYPE_ID from ParquetTable")
parkSQL.show(false)
parkSQL.count()
parkSQL.write.json("s3://test-bucket/json-output-7/")
仅使用此命令,它将生成包含以下内容的文件:
{"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
但是,我想要实现的目标如下:
{"index":{}}
{"LAST_MODIFIED_BY":"david","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"index":{}}
{"LAST_MODIFIED_BY":"sarah","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
任何有关如何实现这一结果的见解将不胜感激!
解决方法
下面的代码会将 {"index":{}}
与 DataFrame
中的现有行数据合并,并将数据转换为 json
,然后使用 text
格式保存 json 数据。
df
.select(
lit("""{"index":{}}""").as("index"),to_json(struct($"*")).as("json_data")
)
.select(
concat_ws(
"\n",// This will split index column & other column data into two lines.
$"index",$"json_data"
).as("data")
)
.write
.format("text") // This is required.
.save("s3://test-bucket/json-output-7/")
最终输出
cat part-00000-24619b28-6501-4763-b3de-1a2f72a5a4ec-c000.txt
{"index":{}}
{"CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"index":{}}
{"CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。