如何解决TPL数据流管道中的工作单元问题
我有一个经典的生产者消费者问题,多个用户可以同时将数据发布到Web API方法(api / test),这会触发耗费大量时间的 IO 异步运行操作。我使用链接到ActionBlock
的{{1}}将并发请求的数量限制为5。
BufferBlock
类注册为单例,目标是允许所有对api / test的调用都馈入此队列。这意味着诸如完成阻止之类的事情是不可行的。
等待控制器完成启动工作的最有效方法是什么?
Web API控制器:
Producer
生产者/消费者实现:
[Route("api/test")]
[ApiController]
public class TestController : ControllerBase
{
private Producer producer;
public TestController(Producer producer)
{
this.producer = producer;
}
[HttpGet]
public async Task<string[]> Values()
{
for (int i = 1; i <= 10; i++)
{
await this.producer.AddAsync(1);
}
// i've added my work to the queue,elegant completion required
return new string[] { "value1","value2" };
}
}
解决方法
因此,您可以使用许多方法和同步原语来解决此问题,每种方法都有自己的优势,容错能力和根据您的需求的问题。这是带有"""Program to track specific location of turtle (for every step)"""
from turtle import *
from math import *
def cball_graphics():
leonardo = Turtle()
leonardo.color("dark blue")
leonardo.shape("turtle")
leonardo.speed(1)
return leonardo
def show_position():
pos = Turtle()
pos.color("white")
pos.goto(30,-50)
pos.color("red")
return pos
class cannon_ball:
def __init__(self,angle,vel,height,time):
self.x_pos = 0
self.y_pos = height
theta = pi * angle / 180
self.x_vel = vel * cos(theta)
self.y_vel = vel * sin(theta)
self.time = time
def update_time(self):
self.x_pos += self.time * self.x_vel
y_vel1 = self.y_vel - 9.8 * self.time
self.y_pos += self.time * (self.y_vel + y_vel1) / 2
self.y_vel = y_vel1
def get_x(self):
return self.x_pos
def get_y(self):
return self.y_pos
def variables():
angle = 55
vel = 10
height = 100
time = .01
return cannon_ball(angle,time)
def main():
leonardo = cball_graphics()
"""pos is a variable that writes the position on the screen using x and y pos"""
pos = show_position()
pos.hideturtle()
projectile = variables()
while projectile.y_pos >= 0:
pos.write(f"{'%0.0f' % projectile.x_pos},{'%0.0f' % projectile.y_pos}")
projectile.update_time()
leonardo.goto(projectile.x_pos,projectile.y_pos)
pos.clear()
main()
给予
TaskCompletionSource
等待中
您可以轻松地重构它以进行发送并在一个呼叫中等待。
public class Producer
{
private BufferBlock<int> _queue;
private ActionBlock<int> _consumer;
public Action<int,string> OnResult;
public Producer()
{
InitializeChain();
}
private void InitializeChain()
{
_queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5,MaxDegreeOfParallelism = 5 };
_consumer = new ActionBlock<int>(SomeIoWorkAsync,consumerOptions);
_queue.LinkTo(_consumer,new DataflowLinkOptions { PropagateCompletion = true });
}
private async Task SomeIoWorkAsync(int x)
{
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Processing {x}");
await Task.Delay(5000);
OnResult?.Invoke(x,$"SomeResult {x}");
}
public Task AddAsync(int data) => _queue.SendAsync(data);
}
用法
public static Task<string> WaitForConsumerAsync(Producer producer,int myId)
{
var tcs = new TaskCompletionSource<string>();
producer.OnResult += (id,result) =>
{
if(id == myId)
tcs.TrySetResult(result);
};
return tcs.Task;
}
输出
var producer = new Producer();
// to simulate something you are waiting for,and id or what ever
var myId = 7;
// you could send and await in the same method if needed. this is just an example
var task = WaitForConsumerAsync(producer,myId);
// create random work for the bounded capacity to fill up
// run this as a task so we don't hit the back pressure before we await (just for this test)
Task.Run(async () =>
{
for (int i = 1; i <= 20; i++)
await producer.AddAsync(i);
});
// wait for your results to pop out
var result = await task;
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Got my result {result},now i can finish happily");
// you can happily end here,the pipeline will keep going
Console.ReadKey();
注意:您可能需要使用该示例,这样它才不会超时
一次完成所有操作的示例
12:04:41.62464 : Processing 3
12:04:41.6246489 : Processing 1
12:04:41.6246682 : Processing 2
12:04:41.624641 : Processing 4
12:04:41.624661 : Processing 5
12:04:41.8530723 : Processing 7
12:04:41.8530791 : Processing 8
12:04:41.8531427 : Processing 10
12:04:41.8530716 : Processing 6
12:04:41.8530967 : Processing 9
12:04:42.0531947 : Got my result SomeResult 7,now i can finish happily
12:04:42.0532178 : Processing 11
12:04:42.0532453 : Processing 12
12:04:42.0532721 : Processing 14
12:04:42.0532533 : Processing 13
12:04:42.2674406 : Processing 15
12:04:42.2709914 : Processing 16
12:04:42.2713017 : Processing 18
12:04:42.2710417 : Processing 17
12:04:42.4689852 : Processing 19
12:04:42.4721405 : Processing 20
附加说明
这实际上只是public async Task<string> AddAsync(int data)
{
await _queue.SendAsync(data);
return await WaitForConsumerAsync(data);
}
public Task<string> WaitForConsumerAsync(int data)
{
var tcs = new TaskCompletionSource<string>();
OnResult += (id,result) =>
{
if (id == data)
tcs.TrySetResult(result);
};
return tcs.Task;
}
事件的一个学术示例。我假设您的管道比给定的示例还要复杂,并且您正在执行 CPU和IO绑定的工作负载,此外,在此示例中您实际上需要awaitable
,这是多余的。 / p>
- 如果您正在等待纯粹的 IO工作负载,则最好等待它们,不需要管道。
- 在您提供的信息中,除非您有某种内存限制,否则实际上不需要使用
BufferBlock
产生背压。 - 您需要注意
BoundedCapacity
和默认的BoundedCapacity
。使用EnsureOrdered = true
,管道将更加有效。作业完成后会弹出,并且背压不会受到结果排序的影响,这意味着项目可能会更快地通过管道进行处理 - 您还可以使用其他框架,例如RX,这可能会使所有这些框架更加优雅和流畅。
- 通过将
EnsureOrdered = false
设置为线性块,还可以提高效率
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。