如何解决如何使用Spring Boot实现一次交付消费者语义 我当前的使用者配置是:下面是我的消费者代码库:选项1如第一篇文章所述选项2如第二篇文章所述
我浏览了以下两个链接:- https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
- https://medium.com/@andy.bryant/processing-guarantees-in-kafka-12dd2e30be0e
它们都有两种不同的方法来实现“至多一次”的消费者语义,所以我有点困惑 哪个跟随。
我当前的使用者配置是:
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.bootstrap-servers=<server port>
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
下面是我的消费者代码库:
@KafkaListener(topics = TOPIC,groupId = consumerGroupId)
public void streamListener(@Payload String message,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long producerTimestamp) {
parseMessage(message,producerTimestamp);
}
}
我的parseMessage()
所做的是,它处理消息并将其保存到NOSQL数据库中。
解决方法
您是正确的,两篇文章都提到了在用户端实现“至多一次”语义的稍微不同的配置。但是,实际上它们在方法上是平等的。
“最多一次语义”的思想可以概括为
要“最多一次”传递,请在
poll()
之后立即提交消息,然后处理消息。
现在,这可以通过两种方式来实现,要么使用自动提交,要么通过确保在commitSync()
调用之后和进一步处理数据之前立即调用poll()
。
选项1(如第一篇文章所述)
-
将
enable.auto.commit
设置为true。 -
将
auto.commit.interval.ms
设置为较低的时间范围。 (备注:应设置为最小值) -
并且不要拨打
consumer.commitSync()
;来自消费者。
使用此使用者配置,Kafka将在指定间隔自动提交偏移量。
选项2(如第二篇文章所述)
-
使用者应用程序将
enable.auto.commit
设置为false -
,并且已编程为在写入数据库之前将其偏移量重新提交给Kafka 。
您会看到,两篇文章都具有相同的总体思想,并且与往常一样,有多种实现方法。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。