如何解决Avro Producer发送没有密钥架构的密钥
我在Python 2.7中使用Avro Producer。我需要发送带有键和值的消息, 该值在主题中具有Avro-Schema,但该键没有Avro-Schema(由于键的原因,我无法添加Schema-传统原因)。
这是我的代码:
def main():
kafkaBrokers = os.environ.get('KAFKA_BROKERS')
schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
topic = os.environ.get('KAFKA_TOPIC')
subject = '${}-value'.format(topic)
sr = CachedSchemaRegistryClient(schemaRegistry)
schema = sr.get_latest_schema(subject).schema
value_schema = avro.loads(str(schema))
url = 'test.com'
value = {'url': u'test.com','priority': 10}
avroProducer = AvroProducer({
'bootstrap.servers': kafkaBrokers,'schema.registry.url': schemaRegistry
},default_value_schema=value_schema)
key = 1638895406382020875
avroProducer.produce(topic=topic,value=value,key=key)
avroProducer.flush()
我收到以下错误:
raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key
如果我从生产函数中删除密钥:
avroProducer.produce(topic=topic,value=value)
有效。
如何在没有模式的情况下发送密钥?
解决方法
您需要使用常规Producer并自己执行序列化功能
from confluent_kafka import avro
from confluent_kafka.avro import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer
avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema # extract function definition
value_schema = avro.load('avro_schemas/value.avsc') # TODO: Create avro_schemas folder
p = Producer({'bootstrap.servers': bootstrap_servers})
value_payload = serialize_avro(topic,value_schema,value,is_key=False)
p.produce(topic,key=key,value=value_payload,callback=delivery_report)
,
AvroProducer
假定键和值都使用架构注册表进行编码,并且在键和值的有效载荷之前添加了魔术字节和架构ID。
如果要对密钥使用自定义序列化,则可以使用Producer
而不是AvroProducer
。但是,您有责任序列化密钥(使用所需的任何格式)和值(这意味着对值进行编码并在魔术字节和模式ID之前)序列化。要了解如何完成此操作,可以查看AvroProducer
代码。
但这也意味着您将必须编写自己的AvroConsumer
,并且将无法使用kafka-avro-console-consumer
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。