如何解决嵌套列的 Spark、Delta Lake 自动模式演变
模式演化在合并时的工作深度是多少?
在以下情况下合并时自动模式演变不起作用。
import json
d1 = {'a':'b','b':{'c':{'1':1}}}
d2 = {'a':'s','b':{'c':{'1':2,'2':2}}}
d3 = {'a':'v','b':{'c':{'1':4}}}
df1 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d1)]))
#passes
df1.write.saveAsTable('test_table4',format='delta',mode='overwrite',path=f"hdfs://hdmaster:9000/dest/test_table4")
df2 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d2)]))
df2.createOrReplaceTempView('updates')
query = """
MERGE INTO test_table4 existing_records
USING updates updates
ON existing_records.a=updates.a
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql(query) #passes
df3 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d3)]))
df3.createOrReplaceTempView('updates')
query = """
MERGE INTO test_table4 existing_records
USING updates updates
ON existing_records.a=updates.a
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql(query) #FAILS #FAILS
当深度大于 2 并且传入的 df 缺少列时,这看起来像是失败。
这是故意的吗?
如果要追加,这可以用 option("mergeSchema","true")
完美处理。但我想 UPSERT 数据。但是 Merge 无法处理这种架构更改
使用 Delta Lake 0.8.0 版
解决方法
在 Delta 0.8 中,除了更适用于 spark.databricks.delta.schema.autoMerge.enabled
模式的 true
之外,还应通过将 mergeSchema
设置为 append
来调节。
有关此功能的更多详细信息,请参阅 Delta 0.8 announcement blog post。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。