如何解决如何在Spring Boot中延迟加载Kafka Consumer?
我想通过命令行参数提供组ID,但尝试此操作时出现以下错误。
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config,container properties,or @KafkaListener annotation; a group.id is required when group management is used.
这意味着在加载kafkalistener时,它需要group_id。如果我在consumerConfig文件中提供了groupId,则它可以正常工作。
有什么办法可以让我通过命令行给组ID以及kafka侦听器的延迟加载,以便我在程序启动时不需要。
我的ConsumerConfig:
@Configuration
class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Autowired
private ArgumentModel argumentModel;
private Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Bean
public Map<String,Object> consumerConfigs() {
Map<String,Object> props = new HashMap<>();
logger.info("bootstrapServers : {}",bootstrapServers);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG,argumentModel.getKafkaGroupId());
return props;
}
@Bean
public ConsumerFactory<String,String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
解决方法
@KafkaListener(... groupId = "${group.id}")
然后在命令行中传递-Dgroup.id=myGroup
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。