如何解决KStream窗口聚合
尝试合并多个Kafka Streams,汇总并产生一个新主题。但是,在同一窗口中,代码产生的聚合记录与每个输入流中的总输入记录一样多。我希望聚合只在连接窗口的末尾产生1个输出。我在下面的代码中做错了什么?
soup = BeautifulSoup(driver.page_source,"lxml")
price_list= []
for item in soup.select('.valueValue-3kA0oJs5'):
[elem.extract() for elem in soup("div")]
price_list.append(item.text)
print(price_list[3])
解决方法
Kafka Streams默认使用连续更新处理模型。请注意,聚合的结果是KTable
。此结果表为每个窗口包含一行,并且每次处理新记录时,都会更新该窗口(即表中的行)。
如果调用KTable#toStream()
,则将获得表的更改日志流,其中包含对该表的每次更新的记录。
如果每个窗口只希望获得一个结果,则可以使用suppress()
运算符来获取第二个KTable
,即suppress()
取第一个KTable
的changelog流,并等待直到关闭窗口,才将最终结果插入其输出KTable
中。如果使用suppress()
,则应将上游窗口聚合的宽限期(默认为24h)设置为较低的值,即TimeWindows.of(...).grace(...)
。
有关更多详细信息,请查看此博客文章:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。