如何解决从kafka消费而没有无限循环
我目前正在使用Confluent kafka python客户端来消耗来自kafka主题的消息,并且代码在while True
循环内运行良好,如文档中的示例所示。但是,我想建立一个cron作业,每天仅从该主题消耗一次。想法是工作将在早上检查主题,在该时间点使用主题中的所有消息,然后停止。我试图像这样在python中实现这一点:
msg = kafka_consumer.consume()
while msg:
msg_val = msg.value().decode('utf-8')
// do something with msg
msg = kafka_consumer.consume()
问题在于它永远不会消耗任何东西。我猜第一行在第一次尝试就永远不会收到消息。它仅适用于while True
,但我不希望此代码无限运行,直到消耗掉该时间的最后一条消息为止。
解决方法
您可以检查循环中使用者组的偏移量,然后在到达“结束”的某个阈值之内时中断循环。
您可能还想使用max.poll.records
用户配置,以更好地控制返回的记录数
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。