如何解决MassTransit.Kafka 批处理
我正在使用 MassTransit.Kafka 批量生产和消费消息。当我尝试一条一条消费消息时一切正常,但是当我尝试批量消费消息时出现错误:
Confluent.Kafka.ConsumeException: Local: Value deserialization error
---> System.InvalidOperationException: Exception creating proxy (GreenPipes.DynamicInternal.MassTransit.Batch<Aiforfit.WSW.DataStructures.Events.UserEvent>) for MassTransit.Batch<Aiforfit.WSW.DataStructures.Events.UserEvent>
---> System.TypeLoadException: Method 'get_Item' in type 'GreenPipes.DynamicInternal.MassTransit.Batch<Aiforfit.WSW.DataStructures.Events.UserEvent>' from assembly 'MassTransitGreenPipes.DynamicInternal3c37dde6a7c744b796f7ac1cf544383b,Version=0.0.0.0,Cul
ture=neutral,PublicKeyToken=null' does not have an implementation.
看起来像是 NewtonSoft 去土地化错误,但一切都根据 MassTransit 文档完成。我尝试将 UserEvent 转换为 Interface,因为文档中的每个模型都是接口,但它没有帮助。 配置:
public static IServiceCollection AddKafka(this IServiceCollection services,IConfigurationSection section)
{
var config = section.Get<EventMessagingOptions>().Kafka;
services.AddMassTransitHostedService();
services.AddMassTransit(x =>
{
x.UsingInMemory((context,cfg) =>
{
cfg.ConfigureEndpoints(context);
cfg.UseRawJsonSerializer();
});
x.AddRider(rider =>
{
rider.AddConsumer<UserEventConsumer>(typeof(UserEventConsumerDefinition));
rider.UsingKafka((ctx,k) =>
{
k.SecurityProtocol = config.SecurityProtocol;
k.Host(config.Host,configurator =>
{
configurator.UseSasl(saslConfigurator =>
{
saslConfigurator.Username = config.Username;
saslConfigurator.Password = config.Password;
saslConfigurator.Mechanism = config.SaslMechanism;
});
});
k.TopicEndpoint<Batch<UserEvent>>(config.Topics.UserEvent,config.Topics.UserEventGroupId,e =>
{
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.ConfigureConsumer<UserEventConsumer>(ctx);
});
});
});
});
return services;
}
public class UserEventConsumerDefinition : ConsumerDefinition<UserEventConsumer>
{
public UserEventConsumerDefinition()
=> Endpoint(x => x.PrefetchCount = 500);
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,IConsumerConfigurator<UserEventConsumer> consumerConfigurator)
{
consumerConfigurator.Options<BatchOptions>(options => options
.SetMessageLimit(500)
.SetConcurrencyLimit(25));
}
}
public class UserEventConsumer : IConsumer<Batch<UserEvent>>
{
private readonly ICluster _cluster;
public UserEventConsumer(ICluster cluster)
=> _cluster = cluster;
public async Task Consume(ConsumeContext<Batch<UserEvent>> context)
{
Console.WriteLine(context.Message.Length);
}
}
public class UserEvent
{
public Guid EventId { get; set; } = Guid.NewGuid();
public Guid UserId { get; set; }
public string Test { get; set; }
}
看起来像是 NewtonSoft 去土地化错误,但一切都根据 MassTransit 文档完成。我尝试将 UserEvent 转换为 Interface,因为文档中的每个模型都是接口,但它没有帮助。
解决方法
在主题端点的情况下,您仍然为 TopicEndpoint 指定 TopicEndpoint<UserEvent>
,并在您的消费者中使用 Batch<UserEvent>
。在主题端点上配置后,使用 ConfigureConsumer<UserEventConsumer>(ctx)
它将正确处理一批事件到您的使用者的映射。
假设您使用的是最新版本的 MassTransit,它应该增加主题端点上的 ConcurrentMessageLimit 以匹配批量消息容量。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。