TPL数据流管道中的工作单元问题

如何解决TPL数据流管道中的工作单元问题

我有一个经典的生产者消费者问题,多个用户可以同时将数据发布到Web API方法(api / test),这会触发耗费大量时间的 IO 异步运行操作。我使用链接到ActionBlock的{​​{1}}将并发请求的数量限制为5。

BufferBlock类注册为单例,目标是允许所有对api / test的调用都馈入此队列。这意味着诸如完成阻止之类的事情是不可行的。

等待控制器完成启动工作的最有效方法是什么?

Web API控制器:

Producer

生产者/消费者实现:

[Route("api/test")]
[ApiController]
public class TestController : ControllerBase
{
    private Producer producer;

    public TestController(Producer producer)
    {
        this.producer = producer;
    }
    [HttpGet]
    public async Task<string[]> Values()
    {
        for (int i = 1; i <= 10; i++)
        {
            await this.producer.AddAsync(1);
        }

        // i've added my work to the queue,elegant completion required
        return new string[] { "value1","value2" };
    }

}

解决方法

因此,您可以使用许多方法和同步原语来解决此问题,每种方法都有自己的优势,容错能力和根据您的需求的问题。这是带有"""Program to track specific location of turtle (for every step)""" from turtle import * from math import * def cball_graphics(): leonardo = Turtle() leonardo.color("dark blue") leonardo.shape("turtle") leonardo.speed(1) return leonardo def show_position(): pos = Turtle() pos.color("white") pos.goto(30,-50) pos.color("red") return pos class cannon_ball: def __init__(self,angle,vel,height,time): self.x_pos = 0 self.y_pos = height theta = pi * angle / 180 self.x_vel = vel * cos(theta) self.y_vel = vel * sin(theta) self.time = time def update_time(self): self.x_pos += self.time * self.x_vel y_vel1 = self.y_vel - 9.8 * self.time self.y_pos += self.time * (self.y_vel + y_vel1) / 2 self.y_vel = y_vel1 def get_x(self): return self.x_pos def get_y(self): return self.y_pos def variables(): angle = 55 vel = 10 height = 100 time = .01 return cannon_ball(angle,time) def main(): leonardo = cball_graphics() """pos is a variable that writes the position on the screen using x and y pos""" pos = show_position() pos.hideturtle() projectile = variables() while projectile.y_pos >= 0: pos.write(f"{'%0.0f' % projectile.x_pos},{'%0.0f' % projectile.y_pos}") projectile.update_time() leonardo.goto(projectile.x_pos,projectile.y_pos) pos.clear() main()

awaitable 示例

给予

TaskCompletionSource

等待中

您可以轻松地重构它以进行发送并在一个呼叫中等待。

public class Producer
{
   private BufferBlock<int> _queue;
   private ActionBlock<int> _consumer;
   public Action<int,string> OnResult;
   public Producer()
   {
      InitializeChain();
   }
   private void InitializeChain()
   {
      _queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
      var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5,MaxDegreeOfParallelism = 5 };    
      _consumer = new ActionBlock<int>(SomeIoWorkAsync,consumerOptions);   
      _queue.LinkTo(_consumer,new DataflowLinkOptions { PropagateCompletion = true });
   }

   private async Task SomeIoWorkAsync(int x)
   {
      Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Processing {x}");
      await Task.Delay(5000);
      OnResult?.Invoke(x,$"SomeResult {x}");
   }

   public Task AddAsync(int data) => _queue.SendAsync(data);
}

用法

public static Task<string> WaitForConsumerAsync(Producer producer,int myId)
{
   var tcs = new TaskCompletionSource<string>();

   producer.OnResult += (id,result) =>
   {
      if(id == myId)
         tcs.TrySetResult(result);
   };

   return tcs.Task;
}

输出

var producer = new Producer();

// to simulate something you are waiting for,and id or what ever
var myId = 7;

// you could send and await in the same method if needed. this is just an example
var task = WaitForConsumerAsync(producer,myId);

// create random work for the bounded capacity to fill up
// run this as a task so we don't hit the back pressure before we await (just for this test)
Task.Run(async () =>
{
   for (int i = 1; i <= 20; i++)
      await producer.AddAsync(i);
});

// wait for your results to pop out
var result = await task;

Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Got my result {result},now i can finish happily");

// you can happily end here,the pipeline will keep going
Console.ReadKey();

Full Demo Here

注意:您可能需要使用该示例,这样它才不会超时

一次完成所有操作的示例

12:04:41.62464 : Processing 3
12:04:41.6246489 : Processing 1
12:04:41.6246682 : Processing 2
12:04:41.624641 : Processing 4
12:04:41.624661 : Processing 5
12:04:41.8530723 : Processing 7
12:04:41.8530791 : Processing 8
12:04:41.8531427 : Processing 10
12:04:41.8530716 : Processing 6
12:04:41.8530967 : Processing 9
12:04:42.0531947 : Got my result SomeResult 7,now i can finish happily
12:04:42.0532178 : Processing 11
12:04:42.0532453 : Processing 12
12:04:42.0532721 : Processing 14
12:04:42.0532533 : Processing 13
12:04:42.2674406 : Processing 15
12:04:42.2709914 : Processing 16
12:04:42.2713017 : Processing 18
12:04:42.2710417 : Processing 17
12:04:42.4689852 : Processing 19
12:04:42.4721405 : Processing 20

附加说明

这实际上只是public async Task<string> AddAsync(int data) { await _queue.SendAsync(data); return await WaitForConsumerAsync(data); } public Task<string> WaitForConsumerAsync(int data) { var tcs = new TaskCompletionSource<string>(); OnResult += (id,result) => { if (id == data) tcs.TrySetResult(result); }; return tcs.Task; } 事件的一个学术示例。我假设您的管道比给定的示例还要复杂,并且您正在执行 CPU和IO绑定的工作负载,此外,在此示例中您实际上需要awaitable,这是多余的。 / p>

  1. 如果您正在等待纯粹的 IO工作负载,则最好等待它们,不需要管道。
  2. 在您提供的信息中,除非您有某种内存限制,否则实际上不需要使用BufferBlock产生背压。
  3. 您需要注意BoundedCapacity和默认的BoundedCapacity。使用EnsureOrdered = true,管道将更加有效。作业完成后会弹出,并且背压不会受到结果排序的影响,这意味着项目可能会更快地通过管道进行处理
  4. 您还可以使用其他框架,例如RX,这可能会使所有这些框架更加优雅和流畅。
  5. 通过将EnsureOrdered = false设置为线性块,还可以提高效率

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-