如何解决Spark UDF无法正确提供滚动计数
我有一个Spark UDF,可以计算出一列的滚动计数,准确地说是时间。如果我需要计算24小时的滚动计数(例如,输入时间2020-10-02 09:04:00),则需要回溯至2020-10-01 09:04:00(非常准确)。
如果我在本地运行,滚动计数UDF可以很好地工作并给出正确的计数,但是当我在群集上运行时,其给出的结果不正确。这是示例输入和输出
输入
+---------+-----------------------+
|OrderName|Time |
+---------+-----------------------+
|a |2020-07-11 23:58:45.538|
|a |2020-07-12 00:00:07.307|
|a |2020-07-12 00:01:08.817|
|a |2020-07-12 00:02:15.675|
|a |2020-07-12 00:05:48.277|
+---------+-----------------------+
预期产量
+---------+-----------------------+-----+
|OrderName|Time |Count|
+---------+-----------------------+-----+
|a |2020-07-11 23:58:45.538|1 |
|a |2020-07-12 00:00:07.307|2 |
|a |2020-07-12 00:01:08.817|3 |
|a |2020-07-12 00:02:15.675|1 |
|a |2020-07-12 00:05:48.277|1 |
+---------+-----------------------+-----+
最后两个输入值在本地为4和5,但是在群集上它们是不正确的。我最好的猜测是,数据正在执行程序之间分布,并且在每个执行程序上也并行调用udf。由于UDF的参数之一是列(在此示例中,分区键-OrderName),如果是这样,我如何控制/纠正群集的行为。这样它就可以以正确的方式为每个分区计算正确的计数。有任何建议吗
解决方法
根据您的评论,您要计算最近24小时内每条记录的总记录数
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.LongType
//A sample data (Guessing from your question)
val df = Seq(("a","2020-07-10 23:58:45.438","1"),("a","2020-07-11 23:58:45.538","2020-07-11 23:58:45.638","1")).toDF("OrderName","Time","Count")
// Extract the UNIX TIMESTAMP for your time column
val df2 = df.withColumn("unix_time",concat(unix_timestamp($"Time"),split($"Time","\\.")(1)).cast(LongType))
val noOfMilisecondsDay : Long = 24*60*60*1000
//Create a window per `OrderName` and select rows from `current time - 24 hours` to `current time`
val winSpec = Window.partitionBy("OrderName").orderBy("unix_time").rangeBetween(Window.currentRow - noOfMilisecondsDay,Window.currentRow)
// Final you perform your COUNT or SUM(COUNT) as per your need
val finalDf = df2.withColumn("tot_count",count("OrderName").over(winSpec))
//or val finalDf = df2.withColumn("tot_count",sum("Count").over(winSpec))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。