如何解决适用于Azure App Service和.NET Core 3.1中长时间运行的计算的合适解决方案?
对于不需要数据库且该应用程序外部没有IO的应用程序中的Azure App Service和.NET Core 3.1中长时间运行的计算,什么是合适的解决方案?这是一个计算任务。
具体来说,以下内容不可靠,需要解决方案。
[Route("service")]
[HttpPost]
public Outbound Post(Inbound inbound)
{
Debug.Assert(inbound.Message.Equals("Hello server."));
Outbound outbound = new Outbound();
long Billion = 1000000000;
for (long i = 0; i < 33 * Billion; i++) // 230 seconds
;
outbound.Message = String.Format("The server processed inbound object.");
return outbound;
}
这有时会向HttpClient
返回一个空对象(未显示)。较小的工作量将永远成功。例如,30亿次迭代总是成功。更大的数目会很好,特别是需要2400亿。
我认为,到2020年,带有.NET Core的Azure App Service中的合理目标可能是在8个子线程的帮助下将父线程数增加到2400亿,因此每个子数增加到300亿,而父数将将8 M字节的入站对象分成每个孩子入站的较小对象。每个子代接收一个1 M字节的入站,并向父代返回一个1 M字节的出站。父级将结果重新组合成8 M字节的出站。
显然,经过时间将是单线程实现所需时间的12.5%,或1/8,或八分之一。与计算时间相比,切割和重新组装对象的时间短。我假设与计算时间相比,传输对象的时间非常短,因此12.5%的期望值大致准确。
如果我可以获得4或8个内核,那将是很好的。如果我可以获得让我说一个内核周期的50%的线程,那么我可能需要8或16个线程。如果每个线程给我33%的内核周期,那么我将需要12或24个线程。
我正在考虑使用BackgroundService
类,但是我想确认这是正确的方法。微软说...
BackgroundService is a base class for implementing a long running IHostedService.
很显然,如果长时间运行某件事,最好通过System.Threading
使用多个内核来使其更快完成,但是此documentation似乎仅在启动任务的情况下提及System.Threading
通过System.Threading.Timer
。我的示例代码显示我的应用程序中不需要计时器。 HTTP POST将作为工作的机会。通常,我会使用System.Threading.Thread
实例化多个对象以使用多个内核。我发现在需要花费很长时间的工作解决方案的上下文中,没有提到多个内核是一个明显的遗漏,但是可能是由于某些原因Azure App Service无法处理此问题。也许我只是无法在教程和文档中找到它。
任务的启动是图示的HTTP POST控制器。假设最长的工作需要10分钟。 HTTP客户端(未显示)将超时限制设置为1000秒,这比10分钟(600秒)要多得多,以确保安全。 HttpClient.Timeout
是相关属性。目前,我认为HTTP超时是一个真正的限制。而不是某种非绑定(虚假限制),以使某些其他约束导致用户等待9分钟并收到错误消息。实际的绑定限制是一个限制,我可以说“但是对于此超时,它将会成功”。如果HTTP超时不是真正的绑定限制,并且还有其他限制系统的因素,那么我可以调整HTTP控制器,使其具有三(3)个POST方法。因此POST1将意味着使用入站对象启动任务。 POST2的意思是告诉我是否完成。 POST3的意思是给我出站对象。
对于不需要数据库且该应用程序外部没有IO的应用程序中的Azure App Service和.NET Core 3.1中长时间运行的计算,什么是合适的解决方案?这是一个计算任务。
解决方法
序言
几年前,我遇到了一个非常相似的问题。我们需要一种可以处理大量数据的服务。有时,处理过程可能需要10秒钟,而其他时候可能需要一个小时。
首先,我们按照您的问题进行了说明:向服务发送请求,服务处理请求中的数据,并在完成后返回响应。
即将发布
当工作只花了大约一分钟或更短的时间,这很好,但是在此之上的任何事情,服务器将关闭会话,并且呼叫者将报告错误。
在放弃请求之前,服务器的默认响应时间约为2分钟。它不会退出请求的处理...但是会退出HTTP会话。在HttpClient
上设置什么参数都没有关系,服务器是代表太多久的服务器。
问题原因
所有这些都是有充分理由的。服务器套接字极其昂贵。您有有限的余量。服务器试图通过切断比指定时间更长的请求来保护您的服务,以避免套接字短缺的问题。
通常,您希望HTTP请求仅花费几毫秒的时间。如果他们花费的时间比这更长,那么如果您的服务必须高效率地完成其他请求,您最终将遇到套接字问题。
解决方案
我们决定走IHostedService
,特别是BackgroundService
的路线。我们将此服务与队列一起使用。这样,您可以设置一个作业队列,BackgroundService
将一次处理它们(在某些情况下,我们可以同时处理多个队列项目,而在另一些情况下,我们可以水平扩展以生成两个或更多队列)。
为什么ASP.NET Core服务运行BackgroundService
?我想在不紧密耦合到任何特定于Azure的构造的情况下进行处理,以防万一我们需要从Azure迁移到其他云服务(在当时,由于当时的其他原因,我们正在考虑这样做)。>
这对我们来说效果很好,此后我们再也没有看到任何问题。
工作流程如下:
- 呼叫者使用一些参数向服务发送请求
- 服务生成一个“作业”对象,并通过202(接受)响应立即返回ID。
- 服务将此作业放入
BackgroundService
正在维护的队列中 - 呼叫者可以查询作业状态,并获取有关使用此作业ID完成了多少工作以及还有多少要做的信息
- 服务完成任务,将任务置于“已完成”状态,然后返回等待队列以产生更多任务
请记住,您的服务具有在多个实例在运行的情况下水平扩展的能力。在这种情况下,我使用Redis缓存存储作业的状态,以便所有实例共享相同的状态。
如果没有可用的Redis缓存,我还添加了“内存缓存”选项以在本地测试事物。您可以在服务器上运行“内存缓存”服务,只需知道它可以扩展就可以使数据不一致。
示例
由于我已经与孩子结婚,所以每个人上床睡觉后的星期五晚上我的确做不了什么,所以我花了一些时间整理一个可以尝试的例子。完整的solution也可供您试用。
QueuedBackgroundService.cs
该类实现有两个特定目的:一个是从队列中读取(BackgroundService
实现),另一个是写入队列(IQueuedBackgroundService
实现)。
public interface IQueuedBackgroundService
{
Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters);
}
public sealed class QueuedBackgroundService : BackgroundService,IQueuedBackgroundService
{
private sealed class JobQueueItem
{
public string JobId { get; set; }
public JobParametersModel JobParameters { get; set; }
}
private readonly IComputationWorkService _workService;
private readonly IComputationJobStatusService _jobStatusService;
// Shared between BackgroundService and IQueuedBackgroundService.
// The queueing mechanism could be moved out to a singleton service. I am doing
// it this way for simplicity's sake.
private static readonly ConcurrentQueue<JobQueueItem> _queue =
new ConcurrentQueue<JobQueueItem>();
private static readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
public QueuedBackgroundService(IComputationWorkService workService,IComputationJobStatusService jobStatusService)
{
_workService = workService;
_jobStatusService = jobStatusService;
}
/// <summary>
/// Transient method via IQueuedBackgroundService
/// </summary>
public async Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters)
{
var jobId = await _jobStatusService.CreateJobAsync(jobParameters).ConfigureAwait(false);
_queue.Enqueue(new JobQueueItem { JobId = jobId,JobParameters = jobParameters });
_signal.Release(); // signal for background service to start working on the job
return new JobCreatedModel { JobId = jobId,QueuePosition = _queue.Count };
}
/// <summary>
/// Long running task via BackgroundService
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while(!stoppingToken.IsCancellationRequested)
{
JobQueueItem jobQueueItem = null;
try
{
// wait for the queue to signal there is something that needs to be done
await _signal.WaitAsync(stoppingToken).ConfigureAwait(false);
// dequeue the item
jobQueueItem = _queue.TryDequeue(out var workItem) ? workItem : null;
if(jobQueueItem != null)
{
// put the job in to a "processing" state
await _jobStatusService.UpdateJobStatusAsync(
jobQueueItem.JobId,JobStatus.Processing).ConfigureAwait(false);
// the heavy lifting is done here...
var result = await _workService.DoWorkAsync(
jobQueueItem.JobId,jobQueueItem.JobParameters,stoppingToken).ConfigureAwait(false);
// store the result of the work and set the status to "finished"
await _jobStatusService.StoreJobResultAsync(
jobQueueItem.JobId,result,JobStatus.Success).ConfigureAwait(false);
}
}
catch(TaskCanceledException)
{
break;
}
catch(Exception ex)
{
try
{
// something went wrong. Put the job in to an errored state and continue on
await _jobStatusService.StoreJobResultAsync(jobQueueItem.JobId,new JobResultModel
{
Exception = new JobExceptionModel(ex)
},JobStatus.Errored).ConfigureAwait(false);
}
catch(Exception)
{
// TODO: log this
}
}
}
}
}
它是这样注入的:
services.AddHostedService<QueuedBackgroundService>();
services.AddTransient<IQueuedBackgroundService,QueuedBackgroundService>();
ComputationController.cs
用于读取/写入作业的控制器如下所示:
[ApiController,Route("api/[controller]")]
public class ComputationController : ControllerBase
{
private readonly IQueuedBackgroundService _queuedBackgroundService;
private readonly IComputationJobStatusService _computationJobStatusService;
public ComputationController(
IQueuedBackgroundService queuedBackgroundService,IComputationJobStatusService computationJobStatusService)
{
_queuedBackgroundService = queuedBackgroundService;
_computationJobStatusService = computationJobStatusService;
}
[HttpPost,Route("beginComputation")]
[ProducesResponseType(StatusCodes.Status202Accepted,Type = typeof(JobCreatedModel))]
public async Task<IActionResult> BeginComputation([FromBody] JobParametersModel obj)
{
return Accepted(
await _queuedBackgroundService.PostWorkItemAsync(obj).ConfigureAwait(false));
}
[HttpGet,Route("computationStatus/{jobId}")]
[ProducesResponseType(StatusCodes.Status200OK,Type = typeof(JobModel))]
[ProducesResponseType(StatusCodes.Status404NotFound,Type = typeof(string))]
public async Task<IActionResult> GetComputationResultAsync(string jobId)
{
var job = await _computationJobStatusService.GetJobAsync(jobId).ConfigureAwait(false);
if(job != null)
{
return Ok(job);
}
return NotFound($"Job with ID `{jobId}` not found");
}
[HttpGet,Route("getAllJobs")]
[ProducesResponseType(StatusCodes.Status200OK,Type = typeof(IReadOnlyDictionary<string,JobModel>))]
public async Task<IActionResult> GetAllJobsAsync()
{
return Ok(await _computationJobStatusService.GetAllJobsAsync().ConfigureAwait(false));
}
[HttpDelete,Route("clearAllJobs")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status401Unauthorized)]
public async Task<IActionResult> ClearAllJobsAsync([FromQuery] string permission)
{
if(permission == "this is flakey security so this can be run as a public demo")
{
await _computationJobStatusService.ClearAllJobsAsync().ConfigureAwait(false);
return Ok();
}
return Unauthorized();
}
}
工作示例
只要该问题仍然有效,我将maintain a working example进行尝试。对于此特定示例,您可以指定要运行的迭代次数。为了模拟长时间运行的工作,每个迭代为1秒。因此,如果将迭代值设置为60,它将使该作业运行60秒。
在运行时,运行computationStatus/{jobId}
或getAllJobs
端点。您可以观看所有作业的实时更新。
该示例远非一个功能齐全,涵盖所有边缘情况,全面成熟,可立即投入生产的示例,但这是一个好的开始。
结论
在后端工作了几年之后,我看到了很多问题,因为他们不了解后端的所有“规则”。希望这个答案能对我过去遇到的问题有所启发,并希望这使您不必再面对上述问题。
,一种选择是尝试Azure Durable Functions,它更适合长期运行的作业,这些作业需要检查点和状态,以防止在触发请求的上下文中尝试完成。它还具有扇出/扇入的概念,以防您要描述的内容被分成总结果较小的工作。
如果仅以原始计算为目标,Azure Batch可能会是一个更好的选择,因为它有助于扩展。
,我假设需要完成的实际工作不是遍历一个没有执行任何操作的循环,因此就可能的并行化而言,我现在不能提供太多帮助。是工作CPU密集型还是IO相关?
对于Azure应用服务中的长期运行,选项之一是使用Web Job。一种可能的解决方案是将计算请求发布到队列(Storage Queue或Azure Message Bus Queues)中。然后,网络作业将处理这些消息,并可能将新消息放入另一个队列,供请求者用来处理结果。
如果保证处理所需的时间少于10分钟,则可以用Queue Triggered Azure Function替换Web Job。这是Azure上的无服务器产品,具有很大的扩展可能性。
另一个选择确实是使用Service Worker或IHostingService的实例,并在那里进行一些队列处理。
,由于您要说的是您的计算以较少的迭代次数成功,因此一个简单的解决方案是简单地定期保存结果并恢复计算。
例如,假设您需要执行2,400亿次迭代,并且您知道要可靠执行的最高迭代次数是30亿次迭代,那么我将进行以下设置:
- 一个实际执行任务的奴隶(2400亿次迭代)
- 主设备,定期接收来自从设备的进度信息。
从设备可以定期向主机发送消息(例如,每20亿次迭代一次)。如果计算中断,此消息可能包含与恢复计算有关的所有内容。
- 主机应跟踪从机。如果主服务器确定从服务器已死亡/崩溃或发生任何故障,则主服务器应简单地创建一个新的从计算机,该新的从计算机应从上次报告的位置恢复计算。
您如何精确地实现主控和从属取决于您的个人喜好。
如果您可以将计算拆分到各个节点上,则不是让单个循环执行2400亿次迭代,而是尝试同时在尽可能多的节点上并行计算解决方案。
我个人将node.js用于多核项目。尽管您使用的是asp.net,但我还是使用了node.js的示例来说明适用于我的体系结构。
Node.js on multi-core machines
https://dzone.com/articles/multicore-programming-in-nodejs
正如Noah Stahl在回答中提到的那样,Azure耐用功能和Azure批处理似乎是帮助您实现平台目标的选项。请查看他的答案以获取更多详细信息。
,标准答案是使用异步消息传递。我有一个blog series on the topic。尤其是这种情况,因为您已经在 Azure 中。
您已经拥有 Azure Web 应用服务,但现在想要在请求的外部运行代码 - “请求外部代码”。运行该代码的正确方法是在单独的进程中 - Azure Functions 或 Azure WebJobs 是 good match for Azure webapps。
首先,您需要一个持久的队列。 Azure 存储队列非常适合,因为您无论如何都在 Azure 中。然后你的 webapi 可以将一条消息写入队列并返回。这里的重要部分是这是一个 durable queue,not an in-memory queue。
与此同时,Azure Function/WebJob 是 processing that queue。它将从队列中取出工作并执行它。
拼图的最后一块是 completion notification。这是一种非常常见的方法:
我可以将我的 HTTP 控制器调整为具有三 (3) 个 POST 方法。因此 POST1 意味着使用入站对象启动任务。 POST2 的意思是告诉我它是否完成了。 POST3 表示给我出站对象。
为此,您的后台处理器应将“进行中”/“完成/结果”状态保存在 webapi 进程可以访问的某个位置。如果您已经有一个共享数据库(并且保留结果是有意义的),那么这可能是最简单的选择。我还会考虑使用 Azure Cosmos DB,它有一个很好的生存时间设置,因此后台服务可以注入“24 小时有效”或其他什么的结果,之后它们会被自动清理。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。