如何解决使用JUnit 5和EmbeddedKafkaBroker在Spring Boot应用程序中测试Apache Kafka集成
我有一个简单的生产者类,定义如下:
@Configuration
public class MyKafkaProducer {
private final static Logger log = LoggerFactory.getLogger(MyKafkaProducer.class);
@Value("${my.kafka.producer.topic}")
private String topic;
@Autowired
KafkaTemplate<String,String> kafkaTemplate;
public void sendDataToKafka(@RequestParam String data) {
ListenableFuture<SendResult<String,String>> listenableFuture = kafkaTemplate.send(topic,data);
listenableFuture.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String,String> result) {
log.info("Sent data {}",result.getProducerRecord().value());
}
@Override
public void onFailure(Throwable ex) {
log.error("Unable to send data {} due to: {}",data,ex.getMessage());
}
});
}
}
这是正在进行的测试类:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyKafkaProducerTest {
private static final String TOPIC = "device";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private MyKafkaProducer producer;
BlockingQueue<ConsumerRecord<String,String>> records;
KafkaMessageListenerContainer<String,String> container;
@BeforeAll
void setUp() {
Map<String,Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer","false",embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String,String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs,new StringDeserializer(),new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String,String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container,embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
@Test
public void testIfWorks() throws InterruptedException {
// Arrange
Map<String,Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String,String> producer = new DefaultKafkaProducerFactory<>(configs,new StringSerializer(),new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC,"my-aggregate-id","{\"event\":\"Test Event\"}"));
producer.flush();
// Assert
ConsumerRecord<String,String> singleRecord = records.poll(100,TimeUnit.MILLISECONDS);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
}
问题在于测试创建了默认的生产者:
Producer<String,new StringSerializer()).createProducer();
如何使用自己的生产者MyKafkaProducer
并调用其sendDataToKafka
方法?在这种情况下,我们可以如何测试?
可以找到源代码here。 进行中的测试分支为here。 谢谢。
解决方法
这是一个Spring Boot应用程序,您正在使用自动配置的KafkaTemplate
。
要覆盖bootstrap-servers
以使用嵌入式kafka代理,请参见https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#kafka-testing-embeddedkafka-annotation
@EmbeddedKafka(topics = "someTopic",bootstrapServersProperty = "spring.kafka.bootstrap-servers")
然后您可以从测试用例中致电生产者。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。