如何解决Kafka Streams复制因子不适用于状态存储变更日志主题
我们正在通过Spring Cloud Stream集成使用Kafka Streams。通过设置
,我将复制因子配置为在所有内部Kafka Streams主题中使用spring.cloud.stream.kafka.streams.binder.configuration.replication.factor=${REPL_FACTOR}
它适用于Kafka Streams内部使用的大多数分区/变更日志主题。但是,此设置似乎对通过Materialized#as(StoreSupplier)
手动创建的状态存储更改日志主题无效。对于这些主题,我仍然可以看到复制因子已设置为默认值1。也无法使用Materialized#withLoggingEnabled(Map<String,String>)
进行设置,因为它仅接受主题级别的配置(replication.factor
是Streams配置)。这是Kafka Streams中的已知错误吗?我什么都找不到。如果是这样,是否有解决方法来增加这些变更日志主题的复制因子?
我们在经纪人方面使用Kafka v2.3.1,在客户端方面使用2.5.0。
解决方法
从版本2.4开始,AdminClient
现在可以在NewTopic
中将复制因子设置为-1,这意味着在创建主题时应使用default.replication.factor
-KIP-464
但是,似乎Kafka Streams当前未使用此功能;有一个公开发行人KAFKA-8531。
您可以使用设置内部主题的复制因子
StreamsConfig.REPLICATION_FACTOR_CONFIG)
https://kafka.apache.org/documentation/#replication.factor
由流处理应用程序创建的更改日志主题和重新分区主题的复制因子。
由于您正在通过活页夹配置进行设置,因此它应该可以正常工作。
编辑
您正在使用哪个版本的spring-cloud-stream?我刚刚用3.0.8进行了测试,它可以按预期工作。
spring.cloud.stream.kafka.streams.binder.configuration.replication.factor: 3
2020-10-15 12:03:55,601错误[kafka-stre] oakspiStreamThread:673-流线程[kafka-streams-inventory-processor-b8d07a5a-f3c4-476a-a265-119163d2acb7-StreamThread-1]在处理过程中遇到以下意外的Kafka异常,这通常表明Streams内部错误: org.apache.kafka.streams.errors.StreamsException:无法创建主题kafka-streams-inventory-processor-inventory-counts-changelog。
由以下原因引起:org.apache.kafka.common.errors.InvalidReplicationFactorException:复制因子:比可用代理大1:3。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。