如何解决将 MassTransit 与 Kafka 和内存中总线一起使用时,应用程序退出期间出现 AccessViolationException
我将 MassTransit 7.1.4 版与 .Net Core 3.1、ASP.NET Core Web 应用程序一起使用。当我通过 Ctrl + C 关闭应用程序时会发生这种情况。当我按下停止调试按钮时,它不会发生。负责注册的代码如下:
private void ConfigureMassTransit(IServiceCollection services)
{
services.AddMassTransit(massTransit =>
{
massTransit.UsingInMemory((ctx,cfg)
=> cfg.ConfigureEndpoints(ctx,SnakeCaseEndpointNameFormatter.Instance));
massTransit.ConfigureRider(
_kafkaHost,_dbConnectionString);
});
services.AddMassTransitHostedService();
}
// The rider part.
rider.UsingKafka((ctx,kafka) =>
{
kafka.Host(kafkaHost);
kafka.TopicEndpoint<Null,IMessageName>(topicName,GroupIds.PushedDeals,cfg =>
{
cfg.CheckpointInterval = TimeSpan.FromMilliseconds(100);
cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
cfg.ConfigureSaga<SagaState>(ctx);
});
});
例外:
Fatal error. System.AccessViolationException: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr,IntPtr)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr,IntPtr)
at Confluent.Kafka.Consumer`2[[System.__Canon,System.Private.CoreLib,Version=4.0.0.0,Culture=neutral,PublicKeyToken=7cec85d7bea7798e],[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].Consume(Int32)
at Confluent.Kafka.Consumer`2[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
at MassTransit.KafkaIntegration.Contexts.KafkaConsumerContext`2+<>c__DisplayClass15_0[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].<Consume>b__0()
at MassTransit.Util.ChannelExecutor+SynchronousFuture`1[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].Run()
at MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12.MoveNext()
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult,[MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12,MassTransit,Version=7.1.4.0,PublicKeyToken=b8e0e9f2f1e657fa]].ExecutionContextCallback(System.Object)
at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext,System.Threading.ContextCallback,System.Object)
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult,PublicKeyToken=b8e0e9f2f1e657fa]].MoveNext(System.Threading.Thread)
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult,PublicKeyToken=b8e0e9f2f1e657fa]].MoveNext()
at System.Threading.ThreadPoolGlobals+<>c.<.cctor>b__5_0(System.Object)
at System.Threading.Channels.AsyncOperation`1[[System.Boolean,PublicKeyToken=7cec85d7bea7798e]].SetCompletionAndInvokeContinuation()
at System.Threading.Channels.AsyncOperation`1[[System.Boolean,PublicKeyToken=7cec85d7bea7798e]].SignalCompletion()
at System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].TryWrite(System.__Canon)
at System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].WriteAsync(System.__Canon,System.Threading.CancellationToken)
at MassTransit.Util.ChannelExecutor+<Run>d__11`1[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].MoveNext()
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[[MassTransit.Util.ChannelExecutor+<Run>d__11`1[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]],PublicKeyToken=b8e0e9f2f1e657fa]](<Run>d__11`1<System.__Canon> ByRef)
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].Start[[MassTransit.Util.ChannelExecutor+<Run>d__11`1[[System.__Canon,PublicKeyToken=b8e0e9f2f1e657fa]](<Run>d__11`1<System.__Canon> ByRef)
at MassTransit.Util.ChannelExecutor.Run[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]](System.Func`1<System.__Canon>,System.Threading.CancellationToken)
at MassTransit.KafkaIntegration.Contexts.KafkaConsumerContext`2[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
at MassTransit.KafkaIntegration.Contexts.SharedConsumerContext`2[[System.__Canon,PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
当我注释掉主题注册或使用 RabbitMq 作为总线时,它已停止发生。我知道内存总线主要是为了测试,但是有一个现有的 Kafka 基础设施,所以运行另一个基础设施是没有意义的。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。