适用于Azure App Service和.NET Core 3.1中长时间运行的计算的合适解决方案?

如何解决适用于Azure App Service和.NET Core 3.1中长时间运行的计算的合适解决方案?

对于不需要数据库且该应用程序外部没有IO的应用程序中的Azure App Service和.NET Core 3.1中长时间运行的计算,什么是合适的解决方案?这是一个计算任务。

具体来说,以下内容不可靠,需要解决方案。

[Route("service")]
[HttpPost]
public Outbound Post(Inbound inbound)
{
    Debug.Assert(inbound.Message.Equals("Hello server."));
    Outbound outbound = new Outbound();
    long Billion = 1000000000;
    for (long i = 0; i < 33 * Billion; i++) // 230 seconds
        ;
    outbound.Message = String.Format("The server processed inbound object.");
    return outbound;
}

这有时会向HttpClient返回一个空对象(未显示)。较小的工作量将永远成功。例如,30亿次迭代总是成功。更大的数目会很好,特别是需要2400亿。

我认为,到2020年,带有.NET Core的Azure App Service中的合理目标可能是在8个子线程的帮助下将父线程数增加到2400亿,因此每个子数增加到300亿,而父数将将8 M字节的入站对象分成每个孩子入站的较小对象。每个子代接收一个1 M字节的入站,并向父代返回一个1 M字节的出站。父级将结果重新组合成8 M字节的出站。

显然,经过时间将是单线程实现所需时间的12.5%,或1/8,或八分之一。与计算时间相比,切割和重新组装对象的时间短。我假设与计算时间相比,传输对象的时间非常短,因此12.5%的期望值大致准确。

如果我可以获得4或8个内核,那将是很好的。如果我可以获得让我说一个内核周期的50%的线程,那么我可能需要8或16个线程。如果每个线程给我33%的内核周期,那么我将需要12或24个线程。

我正在考虑使用BackgroundService类,但是我想确认这是正确的方法。微软说...

BackgroundService is a base class for implementing a long running IHostedService.

很显然,如果长时间运行某件事,最好通过System.Threading使用多个内核来使其更快完成,但是此documentation似乎仅在启动任务的情况下提及System.Threading通过System.Threading.Timer。我的示例代码显示我的应用程序中不需要计时器。 HTTP POST将作为工作的机会。通常,我会使用System.Threading.Thread实例化多个对象以使用多个内核。我发现在需要花费很长时间的工作解决方案的上下文中,没有提到多个内核是一个明显的遗漏,但是可能是由于某些原因Azure App Service无法处理此问题。也许我只是无法在教程和文档中找到它。

任务的启动是图示的HTTP POST控制器。假设最长的工作需要10分钟。 HTTP客户端(未显示)将超时限制设置为1000秒,这比10分钟(600秒)要多得多,以确保安全。 HttpClient.Timeout是相关属性。目前,我认为HTTP超时是一个真正的限制。而不是某种非绑定(虚假限制),以使某些其他约束导致用户等待9分钟并收到错误消息。实际的绑定限制是一个限制,我可以说“但是对于此超时,它将会成功”。如果HTTP超时不是真正的绑定限制,并且还有其他限制系统的因素,那么我可以调整HTTP控制器,使其具有三(3)个POST方法。因此POST1将意味着使用入站对象启动任务。 POST2的意思是告诉我是否完成。 POST3的意思是给我出站对象。

对于不需要数据库且该应用程序外部没有IO的应用程序中的Azure App Service和.NET Core 3.1中长时间运行的计算,什么是合适的解决方案?这是一个计算任务。

解决方法

序言

几年前,我遇到了一个非常相似的问题。我们需要一种可以处理大量数据的服务。有时,处理过程可能需要10秒钟,而其他时候可能需要一个小时。

首先,我们按照您的问题进行了说明:向服务发送请求,服务处理请求中的数据,并在完成后返回响应。

即将发布

当工作只花了大约一分钟或更短的时间,这很好,但是在此之上的任何事情,服务器将关闭会话,并且呼叫者将报告错误。

在放弃请求之前,服务器的默认响应时间约为2分钟。它不会退出请求的处理...但是会退出HTTP会话。在HttpClient上设置什么参数都没有关系,服务器是代表多久的服务器。

问题原因

所有这些都是有充分理由的。服务器套接字极其昂贵。您有有限的余量。服务器试图通过切断比指定时间更长的请求来保护您的服务,以避免套接字短缺的问题。

通常,您希望HTTP请求仅花费几毫秒的时间。如果他们花费的时间比这更长,那么如果您的服务必须高效率地完成其他请求,您最终将遇到套接字问题。

解决方案

我们决定走IHostedService,特别是BackgroundService的路线。我们将此服务与队列一起使用。这样,您可以设置一个作业队列,BackgroundService将一次处理它们(在某些情况下,我们可以同时处理多个队列项目,而在另一些情况下,我们可以水平扩展以生成两个或更多队列)。

为什么ASP.NET Core服务运行BackgroundService?我想在不紧密耦合到任何特定于Azure的构造的情况下进行处理,以防万一我们需要从Azure迁移到其他云服务(在当时,由于当时的其他原因,我们正在考虑这样做)。>

这对我们来说效果很好,此后我们再也没有看到任何问题。

工作流程如下:

  1. 呼叫者使用一些参数向服务发送请求
  2. 服务生成一个“作业”对象,并通过202(接受)响应立即返回ID。
  3. 服务将此作业放入BackgroundService正在维护的队列中
  4. 呼叫者可以查询作业状态,并获取有关使用此作业ID完成了多少工作以及还有多少要做的信息
  5. 服务完成任务,将任务置于“已完成”状态,然后返回等待队列以产生更多任务

请记住,您的服务具有在多个实例在运行的情况下水平扩展的能力。在这种情况下,我使用Redis缓存存储作业的状态,以便所有实例共享相同的状态。

如果没有可用的Redis缓存,我还添加了“内存缓存”选项以在本地测试事物。您可以在服务器上运行“内存缓存”服务,只需知道它可以扩展就可以使数据不一致。

示例

由于我已经与孩子结婚,所以每个人上床睡觉后的星期五晚上我的确做不了什么,所以我花了一些时间整理一个可以尝试的例子。完整的solution也可供您试用。

QueuedBackgroundService.cs

该类实现有两个特定目的:一个是从队列中读取(BackgroundService实现),另一个是写入队列(IQueuedBackgroundService实现)。

public interface IQueuedBackgroundService
{
    Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters);
}

public sealed class QueuedBackgroundService : BackgroundService,IQueuedBackgroundService
{
    private sealed class JobQueueItem
    {
        public string JobId { get; set; }
        public JobParametersModel JobParameters { get; set; }
    }

    private readonly IComputationWorkService _workService;
    private readonly IComputationJobStatusService _jobStatusService;

    // Shared between BackgroundService and IQueuedBackgroundService.
    // The queueing mechanism could be moved out to a singleton service. I am doing
    // it this way for simplicity's sake.
    private static readonly ConcurrentQueue<JobQueueItem> _queue =
        new ConcurrentQueue<JobQueueItem>();
    private static readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

    public QueuedBackgroundService(IComputationWorkService workService,IComputationJobStatusService jobStatusService)
    {
        _workService = workService;
        _jobStatusService = jobStatusService;
    }

    /// <summary>
    /// Transient method via IQueuedBackgroundService
    /// </summary>
    public async Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters)
    {
        var jobId = await _jobStatusService.CreateJobAsync(jobParameters).ConfigureAwait(false);
        _queue.Enqueue(new JobQueueItem { JobId = jobId,JobParameters = jobParameters });
        _signal.Release(); // signal for background service to start working on the job
        return new JobCreatedModel { JobId = jobId,QueuePosition = _queue.Count };
    }

    /// <summary>
    /// Long running task via BackgroundService
    /// </summary>
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while(!stoppingToken.IsCancellationRequested)
        {
            JobQueueItem jobQueueItem = null;
            try
            {
                // wait for the queue to signal there is something that needs to be done
                await _signal.WaitAsync(stoppingToken).ConfigureAwait(false);

                // dequeue the item
                jobQueueItem = _queue.TryDequeue(out var workItem) ? workItem : null;

                if(jobQueueItem != null)
                {
                    // put the job in to a "processing" state
                    await _jobStatusService.UpdateJobStatusAsync(
                        jobQueueItem.JobId,JobStatus.Processing).ConfigureAwait(false);

                    // the heavy lifting is done here...
                    var result = await _workService.DoWorkAsync(
                        jobQueueItem.JobId,jobQueueItem.JobParameters,stoppingToken).ConfigureAwait(false);

                    // store the result of the work and set the status to "finished"
                    await _jobStatusService.StoreJobResultAsync(
                        jobQueueItem.JobId,result,JobStatus.Success).ConfigureAwait(false);
                }
            }
            catch(TaskCanceledException)
            {
                break;
            }
            catch(Exception ex)
            {
                try
                {
                    // something went wrong. Put the job in to an errored state and continue on
                    await _jobStatusService.StoreJobResultAsync(jobQueueItem.JobId,new JobResultModel
                    {
                        Exception = new JobExceptionModel(ex)
                    },JobStatus.Errored).ConfigureAwait(false);
                }
                catch(Exception)
                {
                    // TODO: log this
                }
            }
        }
    }
}

它是这样注入的:

services.AddHostedService<QueuedBackgroundService>();
services.AddTransient<IQueuedBackgroundService,QueuedBackgroundService>();

ComputationController.cs

用于读取/写入作业的控制器如下所示:

[ApiController,Route("api/[controller]")]
public class ComputationController : ControllerBase
{
    private readonly IQueuedBackgroundService _queuedBackgroundService;
    private readonly IComputationJobStatusService _computationJobStatusService;

    public ComputationController(
        IQueuedBackgroundService queuedBackgroundService,IComputationJobStatusService computationJobStatusService)
    {
        _queuedBackgroundService = queuedBackgroundService;
        _computationJobStatusService = computationJobStatusService;
    }

    [HttpPost,Route("beginComputation")]
    [ProducesResponseType(StatusCodes.Status202Accepted,Type = typeof(JobCreatedModel))]
    public async Task<IActionResult> BeginComputation([FromBody] JobParametersModel obj)
    {
        return Accepted(
            await _queuedBackgroundService.PostWorkItemAsync(obj).ConfigureAwait(false));
    }

    [HttpGet,Route("computationStatus/{jobId}")]
    [ProducesResponseType(StatusCodes.Status200OK,Type = typeof(JobModel))]
    [ProducesResponseType(StatusCodes.Status404NotFound,Type = typeof(string))]
    public async Task<IActionResult> GetComputationResultAsync(string jobId)
    {
        var job = await _computationJobStatusService.GetJobAsync(jobId).ConfigureAwait(false);
        if(job != null)
        {
            return Ok(job);
        }
        return NotFound($"Job with ID `{jobId}` not found");
    }

    [HttpGet,Route("getAllJobs")]
    [ProducesResponseType(StatusCodes.Status200OK,Type = typeof(IReadOnlyDictionary<string,JobModel>))]
    public async Task<IActionResult> GetAllJobsAsync()
    {
        return Ok(await _computationJobStatusService.GetAllJobsAsync().ConfigureAwait(false));
    }

    [HttpDelete,Route("clearAllJobs")]
    [ProducesResponseType(StatusCodes.Status200OK)]
    [ProducesResponseType(StatusCodes.Status401Unauthorized)]
    public async Task<IActionResult> ClearAllJobsAsync([FromQuery] string permission)
    {
        if(permission == "this is flakey security so this can be run as a public demo")
        {
            await _computationJobStatusService.ClearAllJobsAsync().ConfigureAwait(false);
            return Ok();
        }
        return Unauthorized();
    }
}

工作示例

只要该问题仍然有效,我将maintain a working example进行尝试。对于此特定示例,您可以指定要运行的迭代次数。为了模拟长时间运行的工作,每个迭代为1秒。因此,如果将迭代值设置为60,它将使该作业运行60秒。

在运行时,运行computationStatus/{jobId}getAllJobs端点。您可以观看所有作业的实时更新。

该示例远非一个功能齐全,涵盖所有边缘情况,全面成熟,可立即投入生产的示例,但这是一个好的开始。

结论

在后端工作了几年之后,我看到了很多问题,因为他们不了解后端的所有“规则”。希望这个答案能对我过去遇到的问题有所启发,并希望这使您不必再面对上述问题。

,

一种选择是尝试Azure Durable Functions,它更适合长期运行的作业,这些作业需要检查点和状态,以防止在触发请求的上下文中尝试完成。它还具有扇出/扇入的概念,以防您要描述的内容被分成总结果较小的工作。

如果仅以原始计算为目标,Azure Batch可能会是一个更好的选择,因为它有助于扩展。

,

我假设需要完成的实际工作不是遍历一个没有执行任何操作的循环,因此就可能的并行化而言,我现在不能提供太多帮助。是工作CPU密集型还是IO相关?

对于Azure应用服务中的长期运行,选项之一是使用Web Job。一种可能的解决方案是将计算请求发布到队列(Storage QueueAzure Message Bus Queues)中。然后,网络作业将处理这些消息,并可能将新消息放入另一个队列,供请求者用来处理结果。

如果保证处理所需的时间少于10分钟,则可以用Queue Triggered Azure Function替换Web Job。这是Azure上的无服务器产品,具有很大的扩展可能性。

另一个选择确实是使用Service WorkerIHostingService的实例,并在那里进行一些队列处理。

,

由于您要说的是您的计算以较少的迭代次数成功,因此一个简单的解决方案是简单地定期保存结果并恢复计算。

例如,假设您需要执行2,400亿次迭代,并且您知道要可靠执行的最高迭代次数是30亿次迭代,那么我将进行以下设置:

  1. 一个实际执行任务的奴隶(2400亿次迭代)
  2. 主设备,定期接收来自从设备的进度信息。

从设备可以定期向主机发送消息(例如,每20亿次迭代一次)。如果计算中断,此消息可能包含与恢复计算有关的所有内容。

  1. 主机应跟踪从机。如果主服务器确定从服务器已死亡/崩溃或发生任何故障,则主服务器应简单地创建一个新的从计算机,该新的从计算机应从上次报告的位置恢复计算。

您如何精确地实现主控和从属取决于您的个人喜好。

如果您可以将计算拆分到各个节点上,则不是让单个循环执行2400亿次迭代,而是尝试同时在尽可能多的节点上并行计算解决方案。

我个人将node.js用于多核项目。尽管您使用的是asp.net,但我还是使用了node.js的示例来说明适用于我的体系结构。

Node.js on multi-core machines

https://dzone.com/articles/multicore-programming-in-nodejs

正如Noah Stahl在回答中提到的那样,Azure耐用功能和Azure批处理似乎是帮助您实现平台目标的选项。请查看他的答案以获取更多详细信息。

,

标准答案是使用异步消息传递。我有一个blog series on the topic。尤其是这种情况,因为您已经在 Azure 中。

您已经拥有 Azure Web 应用服务,但现在想要在请求的外部运行代码 - “请求外部代码”。运行该代码的正确方法是在单独的进程中 - Azure Functions 或 Azure WebJobs 是 good match for Azure webapps

首先,您需要一个持久的队列。 Azure 存储队列非常适合,因为您无论如何都在 Azure 中。然后你的 webapi 可以将一条消息写入队列并返回。这里的重要部分是这是一个 durable queue,not an in-memory queue

与此同时,Azure Function/WebJob 是 processing that queue。它将从队列中取出工作并执行它。

拼图的最后一块是 completion notification。这是一种非常常见的方法:

我可以将我的 HTTP 控制器调整为具有三 (3) 个 POST 方法。因此 POST1 意味着使用入站对象启动任务。 POST2 的意思是告诉我它是否完成了。 POST3 表示给我出站对象。

为此,您的后台处理器应将“进行中”/“完成/结果”状态保存在 webapi 进程可以访问的某个位置。如果您已经有一个共享数据库(并且保留结果是有意义的),那么这可能是最简单的选择。我还会考虑使用 Azure Cosmos DB,它有一个很好的生存时间设置,因此后台服务可以注入“24 小时有效”或其他什么的结果,之后它们会被自动清理。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-