如何解决KGroupedStream与协同分组,聚合和抑制
试图对流进行分组,聚合和抑制,从而导致-
Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier (org.apache.kafka.streams.kstream.internals.PassThrough and org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in unnamed module of loader 'app')
val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(),serdesConfig.notificationSerde()))
val streams2 = confirmationStreams
.groupByKey(Grouped.with(Serdes.String(),serdesConfig.confirmationsSerde()))
val cogrouped = stream1.cogroup(notificationAggregator).cogroup(streams2,confirmationsAggregator)
.windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong())))
.aggregate({ null },Materialized.`as`<String,NotificationMetric,WindowStore<Bytes,ByteArray>>("time-windowed-aggregated-stream-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
但是,如果我移除了.suppress
,它就可以正常工作。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。