如何解决Kaska Producer with MassTransit-IBusInstance未注册
我正在尝试使用MassTransit
建立一个Kafka用户我有这段代码
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddRider(rider =>
{
rider.AddProducer<string,Request>("request",m => m.Message.RequestId);
rider.UsingKafka((context,k) =>
{
k.Host("localhost:9092");
});
});
});
var provider = services.BuildServiceProvider();
var producer = provider.GetRequiredService<ITopicProducer<Request>>();
await producer.Produce(new Request()
{
RequestId = "abc123",RequestedAt = DateTime.UtcNow
});
这是here
中制作人的最简单示例但是当我尝试运行它时,出现此异常
Unhandled exception. System.InvalidOperationException: No service for type 'MassTransit.Registration.IBusInstance' has been registered.
查看他们网站上的示例,我发现这可能与我尚未注册RabbitMQ的事实有关
x.UsingRabbitMq((context,cfg) => cfg.ConfigureEndpoints(context));
但是我没有RabbitMQ,在这种情况下我只使用Kafka。
是否有必要向其他消息代理注册总线以生成到Kafka?
解决方法
从文档中:
MassTransit v7引入的车手提供了一种将消息从任何来源传递到总线的新方法。车手与总线一起配置,并在启动时登上总线。
要添加骑手,必须有一个巴士实例。如果您不需要具有持久性传输的总线(例如RabbitMQ),则可以使用内存中的传输。
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddProducer<string,Request>("request",m => m.Message.RequestId);
rider.UsingKafka((context,k) =>
{
k.Host("localhost:9092");
});
});
});
公交需要启动和停止,这也将启动/停止公交上的所有骑手。您可以通过IBusControl
进行此操作:
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(cancellationToken);
或者如果您使用的是ASP.NET Core通用主机,则通过添加MassTransit托管服务。
services.AddMassTransitHostedService(); // in MassTransit.AspNetCore
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。