如何解决Spring Cloud Kinesis Binder如何为生产者处理错误-根据文档,它不起作用
我已经遵循以下文档,并且有一个制作人和一个消费者在Kinesis Stream上工作得很好。我想了解在发生任何异常的情况下如何处理Producer(源)中的错误。
我已经按照Spring Stream错误处理文档尝试了以下方法:
@StreamListener("errorChannel")
@ServiceActivator(inputChannel = "errorChannel")
两者均不起作用。我在生产者方法中显式抛出RuntimeException,并期望它将在“ errorChannel”中,但无法接收到它。
如果有人成功完成此操作,请帮助我弄清楚或与我分享方法。
解决方法
注意:几个月前,我遇到了这个确切的问题。我们在用Kafka代替Kinesis。既然,您已经标记了spring-cloud-stream-binder-kafka,那么我在相同的地方给出输入。
您可以为生产者编写自定义的回叫,并且该回叫可以告诉您消息是失败还是成功发布。失败时,记录消息的元数据。
下面是一个小的代码段,无法更好地解释我刚才所说的回叫。
请注意,这是给卡夫卡的。相应地更改Kinesis的代码。
public class ProduceToKafka {
private ProducerRecord<String,String> message = null;
// TracerBulletProducer class has producer properties
private KafkaProducer<String,String> myProducer = TracerBulletProducer
.createProducer();
public void publishMessage(String string) {
ProducerRecord<String,String> message = new ProducerRecord<>(
"topicName",string);
myProducer.send(message,new MyCallback(message.key(),message.value()));
}
class MyCallback implements Callback {
private final String key;
private final String value;
public MyCallback(String key,String value) {
this.key = key;
this.value = value;
}
@Override
public void onCompletion(RecordMetadata metadata,Exception exception) {
if (exception == null) {
log.info("--------> All good !!");
} else {
log.info("--------> not so good !!");
log.info(metadata.toString());
log.info("" + metadata.serializedValueSize());
log.info(exception.getMessage());
}
}
}
}
我写了一篇中篇文章,介绍了在Spring抽象的不同级别处理生产者失败的情况。这可以为您提供帮助。看看这个 : https://medium.com/@akhil.ghatiki/kafka-producer-failure-handling-at-multiple-levels-of-spring-abstractions-e530edb02a6c
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。