如何解决合并到增量表中,不适用于Java foreachbatch
我已经创建了一个增量表,现在我正尝试使用foreachBatch()将数据插入到该表中。我已遵循此example。唯一的区别是我使用Java而不是在笔记本中使用Java,但我认为那应该没有什么区别?
我的代码如下:
spark.sql("CREATE TABLE IF NOT EXISTS example_src_table(id int,load_date timestamp) USING DELTA LOCATION '/mnt/delta/events/example_src_table'");
Dataset<Row> exampleDF = spark.sql("SELECT e.id as id,e.load_date as load_date FROM example e");
try {
exampleDF
.writeStream()
.format("delta")
.foreachBatch((dataset,batchId) -> {
dataset.persist();
// Set the dataframe to view name
dataset.createOrReplaceTempView("updates");
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
dataset.sparkSession().sql("MERGE INTO example_src_table e" +
" USING updates u" +
" ON e.id = u.id" +
" WHEN NOT MATCHED THEN INSERT (e.id,e.load_date) VALUES (u.id,u.load_date)");
})
.outputMode("update")
.option("checkpointLocation","/mnt/delta/events/_checkpoints/example_src_table")
.start();
} catch (TimeoutException e) {
e.printStackTrace();
}
此代码运行时没有任何问题,但是没有将数据写入带有url'/ mnt / delta / events / example_src_table'的delta表中。有人知道我在做什么错吗?
我正在使用Spark 3.0和Java 8。
编辑
使用Scala在Databricks Notebook上进行了测试,然后效果很好。
解决方法
如果要使用新数据更新数据,请尝试遵循以下语法:
WHEN NOT MATCHED THEN
UPDATE SET e.load_date = u.load_date AND e.id = u.id
如果您只想添加数据,则它将占用类似的内容
WHEN NOT MATCHED THEN INSERT *
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。