如何解决MassTransit - 异常后不会触发故障消费者
我遇到了故障消费者的问题。当我的消费者抛出异常时 - 味精被重定向到 queueName_error 队列。但我原以为我的消费者会“吃”这个消息,但这不会发生。 我正在使用 Autofac + 我与使用 NserviceBus 的外部系统交换 msg。大众运输 7.1.8.
消费者:
public class PublishStatusConsumer : IConsumer<PublishStatus>,IConsumerDomain {
private readonly ILogger _logger;
private readonly IMessageProcessorsFactory _messageProcessors;
public PublishStatusConsumer(IMessageProcessorsFactory messageProcessors,ILogger logger)
{
_logger = logger;
_messageProcessors = messageProcessors;
}
public async Task Consume(ConsumeContext<PublishStatus> context)
{
var msg = context.Message;
_logger.Debug("normal consumer");
throw new Exception("test");
}
}
我的错消费者:
public class PublishStatusFaultConsumer: IConsumer<Fault<PublishStatus>>,IConsumerDomain
{
private readonly ILogger _logger;
private readonly IConsumeContextLogger _consumeContextLogger;
public PublishStatusFaultConsumer(IConsumeContextLogger consumeContextLogger,ILogger logger)
{
_consumeContextLogger = consumeContextLogger;
_logger = logger;
}
public Task Consume(ConsumeContext<Fault<PublishStatus>> context)
{
_logger.Error(_consumeContextLogger.PrepareLog(context));
return Task.CompletedTask;
}
}
最后是我的配置(autofac):
公共类 BusModule : 模块 { 受保护的覆盖无效负载(ContainerBuilder builder) { builder.RegisterType().As();
builder.AddMassTransit(cfg =>
{
//cfg.AddConsumersFromNamespaceContaining<IConsumerDomain>();
cfg.AddConsumer<PublishStatusConsumer>();
cfg.AddConsumer<PublishStatusFaultConsumer>();
var schedulerEndpoint = new Uri("queue:scheduler");
cfg.AddMessageScheduler(schedulerEndpoint);
cfg.UsingRabbitMq((context,bus) =>
{
var busSettings = context.GetRequiredService<IBusSettings>();
bus.UseNServiceBusJsonSerializer();
bus.UseMessageScheduler(schedulerEndpoint);
bus.UseHealthCheck(context);
bus.UsePrometheusMetrics(serviceName: "myServiceName");
bus.Host(busSettings.HostAddress,busSettings.Port,busSettings.VirtualHost,null,h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
bus.ReceiveEndpoint(busSettings.QueueNameToReceiveStatuses,ec =>
{
ec.AutoDelete = busSettings.AutoDelete;
ec.Durable = busSettings.Durable;
ec.Exclusive = busSettings.Exclusive;
ec.ExchangeType = busSettings.Type;
ec.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5),TimeSpan.FromMinutes(15),TimeSpan.FromMinutes(30)));
ec.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5),TimeSpan.FromSeconds(15)));
ec.UseInMemoryOutbox();
ec.ConfigureConsumer<PublishStatusSuccessConsumer>(context);
//ec.ConfigureConsumer<PublishStatusSuccessFaultConsumer>(context);
});
bus.ConfigureEndpoints(context);
});
});
}
}
它创造了 交流:
MyNamespace:PublishStatus (binded to QueueNameToReceiveStatuses such as configuration states)
MassTransit:Fault--MyNamespace:PublishStatus-- (binded to QueueNameToReceiveStatusesFault such as configuration states)
QueueNameToReceiveStatuses (binded from MyNamespace:PublishStatus to QueueNameToReceiveStatuses)
QueueNameToReceiveStatusesFault (binded from MassTransit:Fault--MyNamespace:PublishStatus-- to QueueNameToReceiveStatusesFault)
加上我的队列(QueueNameToReceiveStatuses 和 QueueNameToReceiveStatusesFault)
我提到的 - 普通消费者工作但在抛出异常后,我收到了msg QueueNameToReceiveStatuses_error 队列但我的错不会触发所以没有人消耗味精
你能帮忙吗?感谢您的帮助!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。