如何解决Kstream抑制大量数据时的失败
有人试图以超过240K /秒的数据速率使用Kafka Suppress。
我们有一个集群,可以根据需要设置和配置集群(包括网络和磁盘I / O),以实现最大吞吐量。
以下代码段效果很好
Stream<String,OutputRecord> deltaStream = this.stream
.groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(Constants.WINDOW_TIME)).advanceBy(Duration.ofMinutes(Constants.ADVANCE_BY)).grace(Duration.ofMinutes(Constants.GRACE_PERIOD)))
.reduce((value1,value2) -> value2,Materialized.<String,InputRecord,WindowStore<Bytes,byte[]>>as("reduced-state-store")
.withKeySerde(Serdes.String())
.withValueSerde(SerdesCollection.getInputRecordSerdes())
.withRetention(Duration.ofMinutes(Constants.REDUCE_STORE_RETENTION_PERIOD)))
.transform(new DeltaTransformer(this.store.name()),this.store.name())
.flatTransformValues(new InterpolationTransformer(this.interpolationStore.name()),this.interpolationStore.name());
但是,我们的逻辑要求每个窗口仅输出一个数据包,因此我们想使用抑制功能。
但是,一旦我们添加
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
由于大量的网络流量,我们看到Kafka群集的行为异常。
增加实例吞吐量和iops并不能使结果平滑,因此想知道抑制功能是否真的已经过大规模测试。
再三考虑---将kafka流应用程序托管在与kafka群集相同的实例上以实现高网络吞吐量的应用程序有什么目的?有没有 affinity 逻辑用于这种目的?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。