如何解决Kafka Python - 消费者
我在使用 Python Consumer 时遇到问题。
下面是我用来学习如何使用 python Kafka 的示例代码,它可以工作。
Producer.py
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x:
dumps(x).encode('utf-8'))
for e in range(1000):
data = {'number' : e}
producer.send('numtest',value=data)
sleep(5)
consumer.py
consumer = KafkaConsumer(
'numtest',bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest',enable_auto_commit=True,group_id='my-group',value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
message = message.value
但是,我更改了代码以满足我的需要,而我的消费者不再工作(没有错误)。
project_producer.py
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x:dumps(x).encode('utf-8'))
while True:
Speed = fetch_all("http://datamall2.mytransport.sg/ltaodataservice/TrafficSpeedBandsv2")
SpeedS = Speed[0:2]
#now = datetime.datetime.now().strftime("%m/%d/%Y,%H:%M:%S")
for road in SpeedS:
producer.send('testing',value=road)
producer.flush()
print(f"Sending road to Kafka: {road}")
sleep(300)
project_consumer.py
consumer = KafkaConsumer(
'testing',auto_offset_reset=None,group_id='hi',consumer_timeout_ms=1000,value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print ("Message",message)
if message is not None:
print (message.offset,message.value)
print ("Quit")
如果您有关于如何使其工作的想法,请告诉我
解决方法
我相信这条线在这里引起了问题
auto_offset_reset=None
如果您的意图是在消费者启动后读取生产者产生的消息,那么您应该像对待生产者一样将消费循环放在另一个循环中。
while True:
for message in consumer:
print ("Message",message)
if message is not None:
print (message.offset,message.value)
# You can put your stop event here. ( like after 1 hour break infinite loop and be done with consumer.)
注意:当您使用 auto_offset_reset=None 时,如果没有找到消费者组的先前偏移量,它将向消费者抛出异常。您应该在代码中正确处理异常。 (或使用最早或最新)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。