如何解决向流数据帧添加ROW_NUMBER列
我对Spark和SQL相当陌生。我试图在我的df中添加一列(然后将其保存到Delta表中),该列为每个记录/行提供唯一的ID,并在每次更新特定记录时对其进行递增。
我正在尝试执行以下操作:
SELECT etc,CONCAT(somerows1) as id1,ROW_NUMBER() OVER(PARTITION BY somerows1 ORDER BY (SELECT NULL)) AS versionid
FROM etc
somerows1是几列的串联,以形成唯一记录。我对以特定格式排序的记录没有特别的兴趣,这就是为什么我选择ORDER BY(SELECT NULL)。
我收到以下错误:
Error in SQL statement: AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;
有人对如何解决这个问题有任何想法吗?
谢谢
解决方法
您要查找的是在滑动事件时间窗口上的聚合。查看文档和示例here。
,我通过使用.writeStream
上的foreachBatch接收器解决了这个问题。这样,您就可以创建一个函数,在该函数中,流数据帧被视为静态/批处理数据帧(该功能将应用于每个微批处理)。
在Scala中,代码如下所示:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{row_number,lit}
val saveWithWindowFunction = (sourceDf: DataFrame,batchId: Long) => {
val windowSpec = Window
.partitionBy("somerows1")
.orderBy(lit(null))
sourceDf
.withColumn("versionid",row_number().over(windowSpec))
//... save the dataframe using: sourceDf.write.save()
}
通过.writeStream
调用您的函数:
.writeStream
.format("delta")
.foreachBatch(saveWithWindowFunction)
.start()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。