如何解决如何在Delta Lake表中添加新列?
我正在尝试向存储为Azure Blob存储中的增量表的数据添加新列。对数据执行的大多数操作都是upsert,具有许多更新和少量新插入。目前,我用于写入数据的代码如下:
DeltaTable.forPath(spark,deltaPath)
.as("dest_table")
.merge(myDF.as("source_table"),"dest_table.id = source_table.id")
.whenNotMatched()
.insertAll()
.whenMatched(upsertCond)
.updateExpr(upsertStat)
.execute()
在these docs中,Delta Lake仅支持在insertAll()
和updateAll()
调用中添加新列。但是,我仅在满足某些条件并希望将新列添加到所有现有数据(默认值为null
)时进行更新。
我想出了一个似乎非常笨拙的解决方案,并且想知道是否有更优雅的方法。这是我当前建议的解决方案:
// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")
// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")
// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema","true").mode("overwrite").save(deltaPath)
解决方法
首先更改您的增量表,然后执行合并操作:
from pyspark.sql.functions import lit
spark.read.format("delta").load('/mnt/delta/cov')\
.withColumn("Recovered",lit(''))\
.write\
.format("delta")\
.mode("overwrite")\
.option("overwriteSchema","true")\
.save('/mnt/delta/cov')
,
您是否尝试过使用merge语句?
https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。