如何解决错误运行使用者时发生未知错误:org.apache.kafka.common.errors.SerializationException:未知魔术字节
正在尝试从python向kafka消费者发送消息。但是由于ERROR Unknown error when running consumer: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
而出现错误
Python可以正确地从twitter api获取数据,但是无法将消息发送给使用者。
任何建议都会有所帮助。
代码:
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from kafka import KafkaProducer
access_token = "xxxx"
access_token_secret = "xxxx"
consumer_key = "xxxx"
consumer_secret = "xxxx"
class StdOutListener(StreamListener):
def on_data(self,data):
producer.send("twitter",data.encode('utf-8'))
print (data)
return True
def on_error(self,status):
print (status)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
l = StdOutListener()
auth = OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_token_secret)
stream = Stream(auth,l)
stream.filter(track="Bitcoin")
消费者:
kafka-avro-console-consumer --bootstrap-server 127.0.0.1:9092 --topic twitter
我也尝试提供属性,
afka-avro-console-consumer --bootstrap-server 127.0.0.1:9092 --topic twitter --property print.key=true --property print.value=true --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer --key-deserializer org.apache.kafka.common.serialization.StringDeserializer
错误:
[2020-08-14 07:23:52,305] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
解决方法
错误似乎认为它正在收到Proto消息
反序列化 Protobuf 消息时出错
但是您正在使用kafka-avro-console-consumer
同时,您的代码正在发送utf-8编码的字符串,因此您不需要任何Confluent工具,只需kafka-console-consumer
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。