如何解决MassTransit:当消息类继承另一个类和/或实现接口时,多次调用通用 SendFilter/PublishFilter
首先,请原谅我的英语很糟糕。我将 MassTransit 与 Azure 服务总线一起用于微服务之间的异步通信。我需要使用中间件过滤器向消息添加元数据。问题是 Send() 和 Publish() 的过滤器为消息继承的每个类以及它实现的每个接口实例化一次。 要注册过滤器,我使用扩展方法“UseSendFilter()/UsePublishFilter()”。
有没有办法只为消息的具体类型调用发送/发布过滤器?
我留下了一个例子,希望可以帮助你理解问题。
发送过滤器
public class SendFilterTest<T> : IFilter<SendContext<T>> where T : class
{
private readonly ILogger<SendFilterTest<T>> _logger;
public SendFilterTest(ILogger<SendFilterTest<T>> logger = null)
{
_logger = logger ?? NullLogger<SendFilterTest<T>>.Instance;
}
public async Task Send(SendContext<T> context,IPipe<SendContext<T>> next)
{
try
{
_logger.LogInformation($"SendFilterInstance: {this.GetHashCode()}. MessageId: {context.MessageId}. NextType: {next.GetType().GetFriendlyName(shortName:true)} ");
}
finally
{
// here the next filter in the pipe is called
await next.Send(context);
}
}
public void Probe(ProbeContext context)
{
context.CreateScope(this.GetType().Name);
}
}
过滤器注册通用方法
//in this case sendFilterTypes = new List<Type>() { typeof(SendFilterTest<>) }
...
//send filters middleware register
if (sendFilterTypes != null)
{
foreach (var sendFilterType in sendFilterTypes.Where(t => t != null))
{
cfg.UseSendFilter(sendFilterType,context);
}
}
//publish filters middleware register
if (publishFilterTypes != null)
{
foreach (var publishFilterType in publishFilterTypes.Where(t => t != null))
{
cfg.UsePublishFilter(publishFilterType,context);
}
}
cfg.ReceiveEndpoint(shortMyQueueName,e =>
{
e.ConfigureConsumers(context);
});
...
发送的消息及其接口
public sealed class TestIntegrationCommand : IOutcomingCommand
{
public Uri GetDestinationQueueUri()
{
return new($"queue:test-queue");
}
}
[ExcludeFromTopology]
public interface IOutcomingCommand : ICommand
{
[Obsolete("Use property 'Id' instead")]
Guid CommandId => Id;
Uri GetDestinationQueueUri();
}
[ExcludeFromTopology]
public interface ICommand
{
Guid Id => Guid.NewGuid();
}
发送方法
...
var testCommand = new TestIntegrationCommand();
await (await bus.GetSendEndpoint(testCommand.GetDestinationQueueUri())).Send(testCommand);
...
输出
[2021-04-07 20:15:49Z] info: UnitTests.DependencyInjection.CqrsServicesTest[0]
StartTest
[2021-04-07 20:15:49Z] info: MassTransit[0]
Configured endpoint test-queue,Consumer: UnitTests.TestDomain.TestAggregateConsumer
[2021-04-07 20:15:49Z] info: MassTransit[0]
Configured endpoint test-queue,Consumer: UnitTests.TestDomain.TestEvent1Consumer
[2021-04-07 20:15:50Z] info: MassTransit[0]
Bus started: sb://tms-testing.servicebus.windows.net/
[2021-04-07 20:15:50Z] info: UnitTests.Filters.SendFilterTest[0]
SendFilterInstance: 17664976. MessageId: fc9e0000-01e6-64c9-bf0f-08d8fa01f04d. NextType: MergePipe<SendContext<UnitTests.TestDomain.TestIntegrationCommand>,SendContext<Tms.Framework.Cqrs.Core.Commands.ICommand>>
[2021-04-07 20:15:50Z] info: UnitTests.Filters.SendFilterTest[0]
SendFilterInstance: 39332178. MessageId: fc9e0000-01e6-64c9-bf0f-08d8fa01f04d. NextType: MergePipe<SendContext<Tms.Framework.Cqrs.Core.Commands.Outcoming.IOutcomingCommand>,SendContext<Tms.Framework.Cqrs.Core.Commands.ICommand>>
[2021-04-07 20:15:50Z] info: UnitTests.Filters.SendFilterTest[0]
SendFilterInstance: 26669753. MessageId: fc9e0000-01e6-64c9-bf0f-08d8fa01f04d. NextType: MergePipe<SendContext<UnitTests.TestDomain.TestIntegrationCommand>,SendContext<Tms.Framework.Cqrs.Core.Commands.Outcoming.IOutcomingCommand>>
[2021-04-07 20:15:50Z] info: UnitTests.Filters.SendFilterTest[0]
SendFilterInstance: 15215027. MessageId: fc9e0000-01e6-64c9-bf0f-08d8fa01f04d. NextType: Last<SendContext<UnitTests.TestDomain.TestIntegrationCommand>>
[2021-04-07 20:15:51Z] info: UnitTests.TestDomain.TestAggregateConsumer[0]
Consuming 'TestIntegrationCommand'. Id:UnitTests.TestDomain.TestIntegrationCommand. Count: 0. DO NOTHING!
[2021-04-07 20:15:51Z] info: MassTransit[0]
Bus stopped: sb://tms-testing.servicebus.windows.net/
[2021-04-07 20:15:52Z] info: UnitTests.DependencyInjection.CqrsServicesTest[0]
FinishTest
非常感谢。
问候
博尔哈
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。