如何解决使用spring-kafka-test中的@EmbeddedKafka测试监听器
我正在尝试在使用@KafkaListener创建的springboot中测试侦听器 但是侦听器始终在localhost:9092上侦听,而不是使用此embededKafka
我的听众是这样的:
@Component
@Slf4j
class SomeListener {
private final List<String> receivedMessages = new ArrayList<>();
@KafkaListener(topics = "some-ultra-cool-topic")
public void onKafkaMessage(String theMessage) {
log.info("Message received {}",theMessage);
receivedMessages.add(theMessage);
}
Collection<String> getAll() {
return unmodifiableCollection(receivedMessages);
}
}
然后像这样进行spock测试:
@SpringBootTest
@EmbeddedKafka
@TestPropertySource(properties = ['spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}','spring.kafka.consumer.auto-offset-reset=earliest'])
class SomeListenerTest extends Specification {
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker
@Autowired
SomeListener someListener
void 'should receive message'() {
given:
def sender = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String,String>(KafkaTestUtils.producerProps(embeddedKafkaBroker)))
when:
sender.send('some-ultra-cool-topic','first message content')
then:
someListener.all.size() == 1
}
}
我的application.yaml没有配置引导服务器-因此,它完全是spring-boot的默认设置。
我在日志中看到生产者正在向代理发送消息(它每次都在不同的随机端口上启动)。 但是侦听器总是尝试连接到本地主机上的代理:9092
如何配置它以使用此嵌入式产品?
解决方法
感谢@sawim的提示
实际问题正在测试中。我最终使用lib org.awaitility:awaitility
进行了此测试 then:
waitAtMost(5,SECONDS)
.untilAsserted({ ->
assertThat(personFacade.findAll(),hasSize(1))
})
第一个示例中的配置有效,但是在启动期间,我可以看到尝试连接到localhost:9200的kafka日志-似乎我们可以忽略它
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。