如何解决MassTransit JobConsumer示例由于超时异常而失败
MassTransit到7.0.4。我遇到了一个异常,尝试按以下说明演示最基本的Job Consumer场景:https://masstransit-project.com/advanced/job-consumers.html。异常消息是:
Timeout waiting for response,RequestId: 4ec50000-294e-000c-71b6-08d8768d0bc0
我怀疑该示例可能缺少导体配置。
我有3个项目的解决方案。 Project TestJob拥有ConvertVideo
接口,可在客户端和使用者之间共享:
namespace JobSystem.Jobs
{
using System;
public interface ConvertVideo
{
Guid VideoId { get; }
string Format { get; }
}
}
项目TestJobClient具有单个类Program。对GetResponse的调用中出现了异常。
namespace JobSystemClient
{
using System;
using System.Threading;
using System.Threading.Tasks;
using JobSystem.Jobs;
using MassTransit;
using MassTransit.Contracts.JobService;
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
// Not in sample,I added this
cfg.Host("localhost");
});
var source = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await busControl.StartAsync(source.Token);
try
{
var serviceClient = busControl.CreateServiceClient();
var requestClient = serviceClient.CreateRequestClient<ConvertVideo>();
do
{
string value = await Task.Run(() =>
{
Console.WriteLine("Enter video format (or quit to exit)");
Console.Write("> ");
return Console.ReadLine();
});
if ("quit".Equals(value,StringComparison.OrdinalIgnoreCase))
break;
// Exception comes up here!
var response = await requestClient.GetResponse<JobSubmissionAccepted>(new
{
VideoId = NewId.NextGuid(),Format = value
});
}
while (true);
}
finally
{
await busControl.StopAsync();
}
}
}
}
项目TestJobConsumer具有两个类,即Program和ConvertVideoJobConsumer。我在这里添加了一些代码来启动总线,并确保它保持活动状态以处理请求。
namespace JobSystemConsoleService
{
using System;
using System.Threading.Tasks;
using JobSystem.Jobs;
using MassTransit;
using MassTransit.Conductor;
using MassTransit.JobService;
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
// Not in sample,I added this
cfg.Host("localhost");
var options = new ServiceInstanceOptions()
.EnableInstanceEndpoint();
cfg.ServiceInstance(options,instance =>
{
instance.ConfigureJobServiceEndpoints();
var queueName = instance.EndpointNameFormatter.Consumer<ConvertVideoJobConsumer>();
instance.ReceiveEndpoint(queueName,e =>
{
e.Consumer(() => new ConvertVideoJobConsumer(),c =>
{
// Note: edited this line from options to opts,as
// the example has a compile-time error due to options
// in already existing one of the parent scopes
c.Options<JobOptions<ConvertVideo>>(opts => opts
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(10));
});
});
});
});
await busControl.StartAsync();
Console.WriteLine("Hit <Enter> to quit.");
Console.Write("> ");
Console.ReadLine();
await busControl.StopAsync();
}
public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
// simulate converting the video
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。