如何解决Spark UDAF/Aggregator 按顺序处理记录组
我想用 Spark 做一些自定义 groupBy 聚合,这需要按顺序处理记录(时间戳),并且第 n 条记录的处理需要处理前 (n-1) 条记录(声音有点像流式任务?)。 输入位于按日期分区的一大组文件中。
我目前的解决方案是实现自定义 org.apache.spark.sql.expressions.Aggregator
,它将所有输入记录增量插入缓冲区并在最后进行所有聚合。伪代码如下:
class MyAgg extends Aggregator[IN,SortedList[IN],OUT] {
override def zero: SortedList[IN] = SortedList.empty
override def reduce(b: SortedList[IN],e: Event): SortedList[IN] =
insert_into_b(e)
override def merge(b1: SortedList[IN],b2: SortedList[IN]): SortedList[IN] =
merge_two_lists(b1,b2)
override def finish(b: SortedList[IN]): OUT =
my_main_aggregation_happens_here:
b.foldLeft ...
}
val result = myInputDS.groupBy(_.key).agg((new MyAgg()).toColumn)
此解决方案有效,但我非常担心性能,因为reduce 阶段根本不会减少任何内容,并且所有记录都需要存储在内存中直到最后。我希望有更好的解决方案。
你能帮忙吗? 谢谢。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。