如何解决使用 MassTransit 和 Kafka 配置 Avro
在 Confluent Kafka 主题中生成和消费时,如何配置 MassTransit 以使用 Avro 进行序列化/反序列化?我看到 Avro 序列化器/反序列化器在包 Confluent.SchemaRegistry.Serdes
中。欢迎提供一些代码示例。
解决方法
要将 MassTransit 配置为使用 Avro,我的做法是使用生成的类文件 (avrogen
),然后配置生产者和主题端点,如下所示:
首先,您需要为架构注册表创建客户端:
var schemaRegistryClient = new CachedSchemaRegistryClient(new Dictionary<string,string>
{
{"schema.registry.url","localhost:8081"},});
然后,您可以配置骑手:
services.AddMassTransit(x =>
{
x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer>();
rider.AddProducer<string,KafkaMessage>(Topic,context => context.MessageId.ToString())
.SetKeySerializer(new AvroSerializer<string>(schemaRegistryClient).AsSyncOverAsync())
.SetValueSerializer(new AvroSerializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
rider.UsingKafka((context,k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<string,KafkaMessage>("topic-name","consumer-group",c =>
{
c.SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistryClient).AsSyncOverAsync());
c.SetValueDeserializer(new AvroDeserializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureConsumer<KafkaMessageConsumer>(context);
c.CreateIfMissing(m =>
{
m.NumPartitions = 2;
});
});
});
});
});
您可以查看 working unit test 以查看更多详细信息。我可能应该将其添加到文档中。
我刚刚写了这个来回答这个问题,直到一个小时前我才使用 Avro。
此外,我使用 this article from Confluent 来启动和运行。链接的单元测试项目中的 docker-compose.yml
配置了所有必需的服务。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。