如何解决MassTransit IJobConsumer 无法运行作业
我正在尝试使用新的 IJobConsumer,但该作业从未运行。
在我正在测试的项目中,Mediator 工作正常,以及常规(短期)消费者。我似乎唯一无法工作的是 IJobConsumer。我不确定是什么/如果我配置错误。
这是我配置 MassTransit 的方式:
services.AddMassTransit(x =>
{
x.AddServiceClient();
x.AddRabbitMqMessageScheduler();
x.UsingRabbitMq((context,configurator) =>
{
configurator.UseRabbitMqMessageScheduler();
configurator.Host(options.AzureServiceBusHostOptions.Host);
...
var serviceInstanceOptions = new ServiceInstanceOptions()
.EnableInstanceEndpoint();
configurator.ServiceInstance(serviceInstanceOptions,instanceConfigurator =>
{
instanceConfigurator.ConfigureJobServiceEndpoints(serviceConfigurator =>
{
serviceConfigurator.FinalizeCompleted = true;
});
// Extension method that adds consumers based on my own marker interface
x.AddJobConsumersFromTypeAssembly(consumerType);
instanceConfigurator.ConfigureEndpoints(context);
instanceConfigurator.ReceiveEndpoint("import-products",e =>
{
e.ConfigureConsumer<ImportProductsConsumer>(context,c =>
{
c.Options<JobOptions<IImportProducts>>(o => o
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(10));
});
});
});
这是我得到的错误:
[13:43:14 ERR] R-FAULT rabbitmq://localhost/Job eb810000-857f-0205-07d4-08d8c69e2d13 MassTransit.Contracts.JobService.JobSubmitted MassTransit.JobService.Components.StateMachines.JobSaga(00:00:01.0258821)
Automatonymous.NotAcceptedStateMachineException: MassTransit.JobService.Components.StateMachines.JobSaga(eb810000-857f-0205-47ed-08d8c69e2b1a) Saga exception on receipt of MassTransit.Contracts.JobService.JobSubmitted: Not accepted in state AllocatingJobSlot
---> Automatonymous.UnhandledEventException: The JobSubmitted event is not handled during the AllocatingJobSlot state for the JobStateMachine state machine
at Automatonymous.AutomatonymousStateMachine`1.DefaultUnhandledEventCallback(UnhandledEventContext`1 context)
at Automatonymous.AutomatonymousStateMachine`1.UnhandledEvent(EventContext`1 context,State state)
at Automatonymous.States.StateMachineState`1.Automatonymous.State<TInstance>.Raise[T](EventContext`2 context)
at Automatonymous.AutomatonymousStateMachine`1.Automatonymous.StateMachine<TInstance>.RaiseEvent[T](EventContext`2 context)
at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context,IPipe`1 next)
--- End of inner exception stack trace ---
at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context,IPipe`1 next)
at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context,IPipe`1 next)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context,IPipe`1 next)
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context,IPipe`1 next)
[13:43:15 WRN] R-RETRY rabbitmq://localhost/Job eb810000-857f-0205-07d4-08d8c69e2d13 MassTransit.Context.RetryConsumeContext<MassTransit.Contracts.JobService.JobSubmitted>
Automatonymous.NotAcceptedStateMachineException: MassTransit.JobService.Components.StateMachines.JobSaga(eb810000-857f-0205-47ed-08d8c69e2b1a) Saga exception on receipt of MassTransit.Contracts.JobService.JobSubmitted: Not accepted in state AllocatingJobSlot
---> Automatonymous.UnhandledEventException: The JobSubmitted event is not handled during the AllocatingJobSlot state for the JobStateMachine state machine
at Automatonymous.AutomatonymousStateMachine`1.DefaultUnhandledEventCallback(UnhandledEventContext`1 context)
at Automatonymous.AutomatonymousStateMachine`1.UnhandledEvent(EventContext`1 context,IPipe`1 next)
at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context,IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context,IPipe`1 next)
[13:43:16 ERR] R-FAULT rabbitmq://localhost/Job eb810000-857f-0205-07d4-08d8c69e2d13 MassTransit.Contracts.JobService.JobSubmitted MassTransit.JobService.Components.StateMachines.JobSaga(00:00:00.3542862)
Automatonymous.NotAcceptedStateMachineException: MassTransit.JobService.Components.StateMachines.JobSaga(eb810000-857f-0205-47ed-08d8c69e2b1a) Saga exception on receipt of MassTransit.Contracts.JobService.JobSubmitted: Not accepted in state WaitingToStart
---> Automatonymous.UnhandledEventException: The JobSubmitted event is not handled during the WaitingToStart state for the JobStateMachine state machine
at Automatonymous.AutomatonymousStateMachine`1.DefaultUnhandledEventCallback(UnhandledEventContext`1 context)
at Automatonymous.AutomatonymousStateMachine`1.UnhandledEvent(EventContext`1 context,IPipe`1 next)
[13:43:18 ERR] R-FAULT rabbitmq://localhost/Job eb810000-857f-0205-07d4-08d8c69e2d13 MassTransit.Contracts.JobService.JobSubmitted MassTransit.JobService.Components.StateMachines.JobSaga(00:00:00.3506767)
Automatonymous.NotAcceptedStateMachineException: MassTransit.JobService.Components.StateMachines.JobSaga(eb810000-857f-0205-47ed-08d8c69e2b1a) Saga exception on receipt of MassTransit.Contracts.JobService.JobSubmitted: Not accepted in state WaitingToStart
---> Automatonymous.UnhandledEventException: The JobSubmitted event is not handled during the WaitingToStart state for the JobStateMachine state machine
at Automatonymous.AutomatonymousStateMachine`1.DefaultUnhandledEventCallback(UnhandledEventContext`1 context)
at Automatonymous.AutomatonymousStateMachine`1.UnhandledEvent(EventContext`1 context,IPipe`1 next)
解决方法
您正在为作业使用者配置两个端点,因此它会重复工作。
这个方法需要在 AddMassTransit 块的上面,这个块稍后被调用。
// Extension method that adds consumers based on my own marker interface
x.AddJobConsumersFromTypeAssembly(consumerType);
您应该使用以下选项为您的工作消费者创建消费者定义:
class ImportProductsConsumerDefinition :
ConsumerDefinition<ImportProductsConsumer>
{
override Configure(...)
{
consumerConfigurator.Options<JobOptions<IImportProducts>>(o => o
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(10));
}
}
然后,简化您的总线配置:
configurator.ServiceInstance(serviceInstanceOptions,instanceConfigurator =>
{
instanceConfigurator.ConfigureJobServiceEndpoints(serviceConfigurator =>
{
serviceConfigurator.FinalizeCompleted = true;
});
instanceConfigurator.ConfigureEndpoints(context);
});
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。