如何解决暂停和恢复KafkaConsumer
如果在使用消息期间抛出错误,我要做的就是暂停KafkaConsumer
。
这是我写的
@KafkaListener(...)
public void consume(
@Header(KafkaHeaders.CONSUMER) KafkaConsumer<String,String> consumer,@Payload String message) {
try {
//consumer message
} catch(Exception e) {
saveConsumer(consumer);
consumer.pause();
}
}
然后我写了一个REST服务以恢复消费者
@RestController
@RequestMapping("/consumer")
class ConsumerRestController {
@PostMapping("/resume")
public void resume() {
KafkaConsumer<String,String> consumer = getConsumer();
if(consumer != null) {
consumer.resume(consumer.paused());
}
}
}
现在,我有两个问题。
第一个问题:当我从带有注释的方法@KafkaListener
调用consumer.pause()时会发生什么?
使用者立即暂停,或者我可以接收与同一主题分区的其他偏移量关联的其他消息。
例如,我有偏移量3的“ message1”和偏移量4的“ message2”,“ message1”导致异常,“ message2”会怎样?反正消耗掉了吗?
第二个问题:因为ConcurrentModificationException
不是线程安全的,所以从REST服务恢复使用者得到KafkaConsumer
。那么,我该怎么做呢?
解决方法
请勿直接暂停消费者;而是暂停容器。
@KafkaListener(id = "foo",...)
@Autowired KafkaListenerEndpointRegistry;
...
registry.getListenerContainer("foo").pause();
暂停将在下一次民意调查之前生效;如果您想立即暂停(而不处理上一次民意调查中的剩余记录),请在暂停后抛出一个例外(假设您使用的是SeekToCurrentErrorHandler
,现在是默认值)。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。