如何解决MassTransit和RabbitMq在发生错误时未重试
我在使用Rabbitmq进行MassTransit上的重试时遇到了问题。
我有代码配置:
private static void ConfigureMassTransitWithRabbitMq(this IServiceCollection services,string host,string username,string pwd,string cluster,Assembly assembly)
{
services.AddMassTransit(config =>
{
config.AddConsumers(assembly);
config.SetKebabCaseEndpointNameFormatter();
config.UsingRabbitMq((context,rabbitMqConfig) =>
{
rabbitMqConfig.UseRetry(e => Retry.Immediate(3));
rabbitMqConfig.ConfigureEndpoints(context);
rabbitMqConfig.ConfigurePublish(pipe =>
{
pipe.UseExecute(context1 =>
{
if (context1.CorrelationId == null)
{
context1.CorrelationId = Guid.NewGuid();
}
});
});
rabbitMqConfig.Host(new Uri(host),hst =>
{
hst.Username(username);
hst.Password(pwd);
hst.PublisherConfirmation = true;
hst.Heartbeat(5);
hst.UseCluster(c =>
{
var clusters = cluster?.Split(';');
if (cluster == null || clusters.Length == 0)
return;
foreach (var item in clusters)
c.Node(item);
});
});
rabbitMqConfig.ManagementEndpoint((conf) =>
{
conf.ConfigurePublish((p) =>
{
p.UseRetry((r) =>
{
r.Exponential(3,TimeSpan.FromSeconds(5),TimeSpan.FromSeconds(15),TimeSpan.FromSeconds(5));
});
});
});
});
});
services.AddMassTransitHostedService();
}
我通过组装添加了消费者。
我只是尝试使用:
-
rabbitMqConfig.UseRetry(e => Retry.Immediate(3));
配置MassTransit时
但是在这种情况下,de配置保持无限循环
我尝试使用这个:
try
{
var order = _mapper.Map<object>(context.Message);
await Execute(order);
await context.ConsumeCompleted;
}
catch (Exception)
{
var retry = context.GetRetryCount();
if (maxAttempts > 3)
{
throw;
}
await context.Redeliver(TimeSpan.FromSeconds(5));
}
然后将这段代码放在MassTransitConfiguration中:
rabbitMqConfig.UseDelayedExchangeMessageScheduler();
在最后一种情况下,我得到了重试,但是我没有得到重试的数量,因此我无法控制它。我也尝试通过标头和其他属性来获得此金额,但我总是得到0。
有人可以帮助我吗?!
解决方法
您的配置顺序错误,根据上面的示例更正的配置如下所示:
private static void ConfigureMassTransitWithRabbitMq(this IServiceCollection services,string host,string username,string pwd,string cluster,Assembly assembly)
{
services.AddMassTransit(config =>
{
config.AddConsumers(assembly);
config.SetKebabCaseEndpointNameFormatter();
config.UsingRabbitMq((context,rabbitMqConfig) =>
{
rabbitMqConfig.Host(new Uri(host),hst =>
{
hst.Username(username);
hst.Password(pwd);
hst.Heartbeat(5);
hst.UseCluster(c =>
{
var clusters = cluster?.Split(';');
if (cluster == null || clusters.Length == 0)
return;
foreach (var item in clusters)
c.Node(item);
});
});
rabbitMqConfig.ConfigurePublish(pipe =>
{
pipe.UseExecute(context1 =>
{
context1.CorrelationId ??= Guid.NewGuid();
});
});
rabbitMqConfig.UseRetry(e => Retry.Immediate(3));
rabbitMqConfig.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
}
我还删除了不需要的所有内容,因为它是默认行为,或者未使用。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。