如何解决为什么对我的 MassTransit 应用程序的代码更改会导致 MassTransit.RabbitMqTransport.RabbitMqConnectionException?
(跟进Why does my MassTransit Fault Consumer not get called?)
我将 MassTransit 7.1.4.0 与 RabbitMQ 3.8.7 一起使用。
在我对 C# 代码进行更改后,我得到(下面的详细错误消息)
Unhandled exception. MassTransit.RabbitMqTransport.RabbitMqConnectionException: ReceiveTransport faulted: localhost:5672/
代码改动
- 在类定义中添加关键字 public
我试过了
- 撤消代码更改。
- 重命名端点名称
- 在 RabbitMQ 管理 http://localhost:15672/ 中删除队列和交换
- 重启
- 停止启动 RabbitMq Windows 服务
这些尝试都不起作用。
申请 下面是 C# 应用程序
using System;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.Definition;
using Microsoft.Extensions.DependencyInjection;
namespace FaultConsumer
{
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello Fault Consumer!");
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>(typeof(MyConsumerDefinition));
//x.AddConsumer<MyFaultConsumer>();//typeof(MyFaultConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context,cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(context);
});
});
var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.GetRequiredService<IBusControl>();
await bus.StartAsync();
await bus.Publish(new SubmitOrder() { DateTimeStamp = DateTime.Now });
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await bus.StopAsync();
}
}
//CHANGE I changed this to public (and back)
class SubmitOrder
{
public DateTime DateTimeStamp { get; set; }
}
//CHANGE I changed this to public (and back)
class MyConsumer : IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
Console.WriteLine($"Attempting to consume {context.GetRetryCount()} {context.GetRetryAttempt()}");
throw new Exception(context.GetRetryCount().ToString());
}
}
//CHANGE I changed this to public (and back)
class MyConsumerDefinition : ConsumerDefinition<MyConsumer>
{
public MyConsumerDefinition()
{
EndpointName = "order-service-202102081403";
ConcurrentMessageLimit = 8;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,IConsumerConfigurator<MyConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));
//endpointConfigurator.UseMessageRetry(r => r.Immediate(5));
endpointConfigurator.UseInMemoryOutbox();
}
}
//CHANGE I changed this to public (and back)
class MyFaultConsumer : IConsumer<Fault<SubmitOrder>>
{
public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
{
Console.WriteLine("Fault");
await Task.CompletedTask;
}
}
//CHANGE I changed this to public (and back)
// class MyFaultConsumerDefinition : ConsumerDefinition<MyFaultConsumer>
// {
// public MyFaultConsumerDefinition()
// {
// // override the default endpoint name
// EndpointName = "order-service-faults-202102081342";
// // limit the number of messages consumed concurrently
// // this applies to the consumer only,not the endpoint
// ConcurrentMessageLimit = 8;
// }
// protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,// IConsumerConfigurator<MyFaultConsumer> consumerConfigurator)
// {
// // configure message retry with millisecond intervals
// endpointConfigurator.UseMessageRetry(r => r.Intervals(100,1000));
// // use the outbox to prevent duplicate events from being published
// endpointConfigurator.UseInMemoryOutbox();
// }
// }
}
详细的错误信息
Unhandled exception. MassTransit.RabbitMqTransport.RabbitMqConnectionException: ReceiveTransport faulted: localhost:5672/
---> RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason,initiated by Peer,code=406,text='PRECONDITION_FAILED - inequivalent arg 'durable'
for exchange 'FaultConsumer:SubmitOrder' in vhost '/': received 'false' but current is 'true'',classId=40,methodId=10
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method,ContentHeaderBase header,Byte[] body)
at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange,String type,Boolean passive,Boolean durable,Boolean autoDelete,Boolean internal,Boolean nowait,IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclare(String exchange,IDictionary`2 arguments)
at MassTransit.RabbitMqTransport.Contexts.RabbitMqModelContext.<>c__DisplayClass16_0.<MassTransit.RabbitMqTransport.ModelContext.ExchangeDeclare>b__0()
at MassTransit.Util.ChannelExecutor.<>c__DisplayClass10_0.<Run>g__RunMethod|0()
at MassTransit.Util.ChannelExecutor.<>c__DisplayClass10_0.<Run>b__1()
at MassTransit.Util.ChannelExecutor.SynchronousFuture`1.Run()
--- End of stack trace from previous location ---
at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method,CancellationToken cancellationToken)
at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter`1.ConfigureTopology(ModelContext context)
at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter`1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-RabbitMqTransport-ModelContext>-Send>b__0>d.MoveNext()
--- End of stack trace from previous location ---
at GreenPipes.PipeExtensions.OneTimeSetup[T](PipeContext context,Func`2 setupMethod,PayloadFactory`1 payloadFactory)
at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter`1.GreenPipes.IFilter<MassTransit.RabbitMqTransport.ModelContext>.Send(ModelContext context,IPipe`1 next)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe,CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe,CancellationToken cancellationToken)
at MassTransit.Transports.ReceiveTransport`1.ReceiveTransportAgent.RunTransport()
--- End of inner exception stack trace ---
at MassTransit.Transports.ReceiveTransport`1.ReceiveTransportAgent.RunTransport()
at MassTransit.Transports.ReceiveTransport`1.ReceiveTransportAgent.Run()
at MassTransit.Transports.StartHostHandle.EndpointsReady(Task`1[] endpoints)
at MassTransit.Transports.StartHostHandle.ReadyOrNot(Task`1[] endpoints,Task`1[] riders)
at MassTransit.MassTransitBus.Handle.ReadyOrNot(Task`1 ready)
at MassTransit.MassTransitBus.StartAsync(CancellationToken cancellationToken)
at MassTransit.MassTransitBus.StartAsync(CancellationToken cancellationToken)
at FaultConsumer.Program.Main(String[] args) in D:\code\masstransit\faultconsumer\Program.cs:line 52
at FaultConsumer.Program.<Main>(String[] args)
解决方法
通过将类从 private void createClient(String zkConnect,int reapingThresholdMs) throws Exception {
client = CuratorFrameworkFactory.newClient(zkConnect,new ExponentialBackoffRetry(1000,3));
client.start();
reaper = new Reaper(client,reapingThresholdMs);
reaper.start();
}
public DistributedLock createLock(String lockPath) {
String fullLockPath = checkPathAndAddPrefix(lockPath);
reaper.addPath(fullLockPath,Reaper.Mode.REAP_UNTIL_DELETE);
return new DistributedLock(client,fullLockPath);
}
更改为 private
,您向 MassTransit 表达了消息类型是公共消息合同的意图。
公共消息合同不会自动删除。
私信合约会自动删除。
通过更改可见性,您更改了类型,并且它与已在代理上创建的交换不匹配。
在重新启动应用程序之前,您需要从代理中删除不匹配的交换。这将消除由于现有代理配置无效而导致的启动失败。
错误很详细:
不等价的 arg 'durable' 在 vhost '/' 中交换 'FaultConsumer:SubmitOrder':收到 'false' 但当前为 'true''
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。