如何解决Asyncio 与多处理:生产者-消费者模型
我正在尝试检索股票价格并在价格出现时对其进行处理。我是并发的初学者,但我认为这种设置似乎适合 asyncio 生产者-消费者模型,其中每个生产者检索股票价格,并将其通过队列传递给消费者。现在消费者已经并行(多处理)进行股票价格处理,因为这项工作是 CPU 密集型的。因此,我将有多个消费者已经在工作,而并非所有生产者都已完成检索数据。此外,我想实施一个步骤,如果消费者发现其正在处理的股票价格无效,我们会为该股票生成一个新的消费者工作。
到目前为止,我有以下玩具代码可以让我到达那里,但是我的 process_data 函数(消费者)有问题。
from concurrent.futures import ProcessPoolExecutor
import asyncio
import random
import time
random.seed(444)
#producers
async def retrieve_data(ticker,q):
'''
Pretend we're using aiohttp to retrieve stock prices from a URL
Place a tuple of stock ticker and price into asyn queue as it becomes available
'''
start = time.perf_counter() # start timer
await asyncio.sleep(random.randint(4,8)) # pretend we're calling some URL
price = random.randint(1,100) # pretend this is the price we retrieved
print(f'{ticker} : {price} retrieved in {time.perf_counter() - start:0.1f} seconds')
await q.put((ticker,price)) # place the price into the asyncio queue
#consumers
async def process_data(q):
while True:
data = await q.get()
print(f"processing: {data}")
with ProcessPoolExecutor() as executor:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor,data_processor,data)
#if output of data_processing failed,send ticker back to queue to retrieve data again
if not result[2]:
print(f'{result[0]} data invalid. Retrieving again...')
await retrieve_data(result[0],q) # add a new task
q.task_done() # end this task
else:
q.task_done() # so that q.join() knows when the task is done
async def main(tickers):
q = asyncio.Queue()
producers = [asyncio.create_task(retrieve_data(ticker,q)) for ticker in tickers]
consumers = [asyncio.create_task(process_data(q))]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers,too. blocks until all items in the queue have been received and processed
for c in consumers:
c.cancel() #cancel the consumer tasks,which would otherwise hang up and wait endlessly for additional queue items to appear
'''
RUN IN JUPYTER NOTEBOOK
'''
start = time.perf_counter()
tickers = ['AAPL','AMZN','TSLA','C','F']
await main(tickers)
print(f'total elapsed time: {time.perf_counter() - start:0.2f}')
'''
RUN IN TERMINAL
'''
# if __name__ == "__main__":
# start = time.perf_counter()
# tickers = ['AAPL','F']
# asyncio.run(main(tickers))
# print(f'total elapsed time: {time.perf_counter() - start:0.2f}')
下面的 data_processor() 函数,由上面的 process_data() 调用,需要在 Jupyter notebook 的不同单元格中,或者单独的模块中(据我所知,为了避免 PicklingError)
from multiprocessing import current_process
def data_processor(data):
ticker = data[0]
price = data[1]
print(f'Started {ticker} - {current_process().name}')
start = time.perf_counter() # start time counter
time.sleep(random.randint(4,5)) # mimic some random processing time
# pretend we're processing the price. Let the processing outcome be invalid if the price is an odd number
if price % 2==0:
is_valid = True
else:
is_valid = False
print(f"{ticker}'s price {price} validity: --{is_valid}--"
f' Elapsed time: {time.perf_counter() - start:0.2f} seconds')
return (ticker,price,is_valid)
问题
-
我没有使用 python 的多处理模块,而是使用了 concurrent.futures 的 ProcessPoolExecutor,我读到它与 asyncio (What kind of problems (if any) would there be combining asyncio with multiprocessing?) 兼容。但似乎我必须在检索执行程序调用的函数的输出 (
result
) 和能够并行运行多个子进程之间做出选择。使用下面的构造,子进程按顺序运行,而不是并行运行。with ProcessPoolExecutor() as executor: loop = asyncio.get_running_loop() result = await loop.run_in_executor(executor,data)
删除 result = await
前面的 loop.run_in_executor(executor,data)
允许并行运行多个使用者,但随后我无法从父进程收集它们的结果。为此,我需要 await
。然后当然剩下的代码块会失败。
如何让这些子进程并行运行并提供输出?也许它需要一个与生产者-消费者模型不同的结构或其他东西
-
请求再次检索无效股票价格的部分代码有效(前提是我可以从上面获得结果),但它在调用它的子进程中运行并阻止新消费者的创建,直到请求得到满足。有没有办法解决这个问题?
#if output of data_processing failed,send ticker back to queue to retrieve data again if not result[2]: print(f'{result[0]} data invalid. Retrieving again...') await retrieve_data(result[0],q) # add a new task q.task_done() # end this task else: q.task_done() # so that q.join() knows when the task is done
解决方法
但似乎我必须在检索执行程序调用的函数的输出(结果)和能够并行运行多个子进程之间做出选择。
幸运的是,情况并非如此,您还可以使用 asyncio.gather()
一次等待多个项目。但是你从队列中一个一个地获取数据项,所以你没有一批要处理的项。最简单的解决方案是启动多个消费者。替换
# the single-element list looks suspicious anyway
consumers = [asyncio.create_task(process_data(q))]
与:
# now we have an actual list
consumers = [asyncio.create_task(process_data(q)) for _ in range(16)]
每个消费者都将等待一个单独的任务完成,但这没关系,因为您将有一整套并行工作,这正是您想要的。
此外,您可能希望将 executor
设为全局变量并且不使用 with
,以便所有消费者共享进程池并持续到程序。这样,消费者将重用已经生成的工作进程,而不必为从队列中接收到的每个作业生成一个新进程。 (这就是拥有进程“池”的全部意义。)在这种情况下,您可能希望在程序中不再需要执行程序的地方添加 executor.shutdown()
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。