如何解决如何获取结构化流中的记录数计数?
我有一个Spark结构化的流媒体,在我有正确的记录计数后,我需要停止流媒体播放过程。
到目前为止,我有一种方法可以在没有活动的特定时间间隔停止流式传输,如何添加一个计数器来获取计数并关闭流式传输?
val resultStream=furtherFlattening
.writeStream
.format("console")
.option("truncate","false")
.trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS))
//. trigger(Trigger.ProcessingTime(5,TimeUnit.MINUTES))
.start()
.awaitTermination()
def stopStreamQuery(query: StreamingQuery,awaitTerminationTimeMs: Long,spark:SparkSession) {
while (query.isActive) {
val msg = query.status.message
if (!query.status.isDataAvailable
&& !query.status.isTriggerActive
&& !msg.equals("Initializing sources")) {
query.stop()
spark.close()
}
query.awaitTermination(awaitTerminationTimeMs)
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。