如何解决实现异步“优先级”分块的最佳方法是什么?
我能找到的最接近的实现是aiostream的chunks的实现。这样就可以“从异步序列中生成大小为n的块。这些块是列表,最后一个块可能包含少于n个元素”。
我已经实现了类似的功能,但主要区别在于,它优先一次完成一个批次,而不是一次完成多个批次。
import asyncio
import aiostream
from collections import deque
class IterableAsyncQueue:
def __init__(self):
self.queue = asyncio.Queue()
async def put(self,value):
await self.queue.put(value)
def __aiter__(self):
return self
async def __anext__(self):
return await self.queue.get()
class Batch:
def __init__(self,n):
self.batch_size = n
def __call__(self,iterable,*args):
self.iterable = iterable
self.calls = deque()
self.pending = set()
self.pending_return = asyncio.Queue()
self.initialised = False
return self
def __iter__(self):
iterable = iter(self.iterable)
return iter(lambda: tuple(itertools.islice(iterable,self.batch_size)),())
def __aiter__(self):
return self
async def __anext__(self):
self.pending |= {asyncio.create_task(self.iterable.__anext__()) for _ in range(self.batch_size)}
if self.initialised:
future = asyncio.get_running_loop().create_future()
self.calls.append(future)
await future
else:
self.initialised = True
batch = []
while len(batch) < self.batch_size:
done,_ = await asyncio.wait(self.pending,return_when=asyncio.FIRST_COMPLETED)
done = list(done)[0]
batch.append(await done)
self.pending.discard(done)
next_call = self.calls.popleft()
next_call.set_result(None)
return batch
async def consumer(n,a):
start = time.time()
async for x in a:
print(n,x,time.time() - start)
async def producer(q):
for x in range(50):
await asyncio.sleep(0.5)
await q.put(x)
q = IterableAsyncQueue()
# a = Batch(5)(q)
a = aiostream.stream.chunks(q,5)
loop = asyncio.get_event_loop()
loop.create_task(producer(q))
loop.create_task(consumer(1,a))
loop.create_task(consumer(2,a))
loop.run_forever()
使用aiostream.stream.chunks的输出:
1 [0,2,4,6,8] 4.542179107666016
2 [1,3,5,7,9] 5.04422402381897
1 [10,12,14,16,18] 9.575451850891113
2 [11,13,15,17,19] 10.077155828475952
使用我执行的优先批次的输出:
1 [0,1,4] 2.519313097000122
2 [5,8,9] 5.031418323516846
1 [10,11,14] 7.543889045715332
2 [15,18,19] 10.052537202835083
在我看来,优先级批处理从根本上更有用,因为它比chunks更快地产生结果,允许调用代码等待另一批处理。这意味着,如果有m个消费者每个在等待一批大小为n的消费者,则总是在m×n和(m-1)×n个结果之间等待。通过chunks实现,等待的结果数在m和m×n之间变化。
我想知道的是为什么我以前找不到这种实现,这是实现该解决方案的最佳方法吗?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。