如何解决为什么我的 MassTransit Fault Consumer 没有被呼叫?
我将 MassTransit 7.1.4.0 与 RabbitMQ 3.8.7 一起使用。
我有一个消费者只是抛出异常,我有一个错误消费者Console.WriteLine
发生了错误的事实。
未调用故障消费者。我还尝试使用错误消费者定义。这也行不通。
以下是整个程序。
using System;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.Definition;
using Microsoft.Extensions.DependencyInjection;
namespace FaultConsumer
{
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello Fault Consumer!");
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>(typeof(MyConsumerDefinition));
x.AddConsumer<MyFaultConsumer>();//typeof(MyFaultConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context,cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(context);
});
});
var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.GetRequiredService<IBusControl>();
await bus.StartAsync();
await bus.Publish(new SubmitOrder() { DateTimeStamp = DateTime.Now });
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await bus.StopAsync();
}
}
class SubmitOrder
{
public DateTime DateTimeStamp { get; set; }
}
class MyConsumer : IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
Console.WriteLine($"Attempting to consume {context.GetRetryCount()} {context.GetRetryAttempt()}");
throw new Exception(context.GetRetryCount().ToString());
}
}
class MyConsumerDefinition : ConsumerDefinition<MyConsumer>
{
public MyConsumerDefinition()
{
EndpointName = "order-service-202102081221";
ConcurrentMessageLimit = 8;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,IConsumerConfigurator<MyConsumer> consumerConfigurator)
{
//endpointConfigurator.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));
endpointConfigurator.UseMessageRetry(r => r.Immediate(5));
endpointConfigurator.UseInMemoryOutbox();
}
}
class MyFaultConsumer : IConsumer<Fault<MyConsumer>>
{
public async Task Consume(ConsumeContext<Fault<MyConsumer>> context)
{
Console.WriteLine("Fault");
await Task.CompletedTask;
}
}
// class MyFaultConsumerDefinition : ConsumerDefinition<MyFaultConsumer>
// {
// public MyFaultConsumerDefinition()
// {
// // override the default endpoint name
// EndpointName = "order-service-faults-202102081221";
// // limit the number of messages consumed concurrently
// // this applies to the consumer only,not the endpoint
// ConcurrentMessageLimit = 8;
// }
// protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,// IConsumerConfigurator<MyFaultConsumer> consumerConfigurator)
// {
// // configure message retry with millisecond intervals
// endpointConfigurator.UseMessageRetry(r => r.Intervals(100,1000));
// // use the outbox to prevent duplicate events from being published
// endpointConfigurator.UseInMemoryOutbox();
// }
// }
}
解决方法
基于消息类型发布故障,而不是消费者类型。
您的错误消费者应该消耗 Fault<SubmitOrder>
。
class MyFaultConsumer :
IConsumer<Fault<SubmitOrder>>
{
public Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
{
Console.WriteLine("Fault");
return Task.CompletedTask;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。