Asyncio 与多处理:生产者-消费者模型

如何解决Asyncio 与多处理:生产者-消费者模型

我正在尝试检索股票价格并在价格出现时对其进行处理。我是并发的初学者,但我认为这种设置似乎适合 asyncio 生产者-消费者模型,其中每个生产者检索股票价格,并将其通过队列传递给消费者。现在消费者已经并行(多处理)进行股票价格处理,因为这项工作是 CPU 密集型的。因此,我将有多个消费者已经在工作,而并非所有生产者都已完成检索数据。此外,我想实施一个步骤,如果消费者发现其正在处理的股票价格无效,我们会为该股票生成一个新的消费者工作。

到目前为止,我有以下玩具代码可以让我到达那里,但是我的 process_data 函数(消费者)有问题。

from concurrent.futures import ProcessPoolExecutor
import asyncio
import random
import time
random.seed(444)

#producers
async def retrieve_data(ticker,q):
    '''
    Pretend we're using aiohttp to retrieve stock prices from a URL
    Place a tuple of stock ticker and price into asyn queue as it becomes available
    '''
    start = time.perf_counter() # start timer
    await asyncio.sleep(random.randint(4,8)) # pretend we're calling some URL
    price = random.randint(1,100) # pretend this is the price we retrieved
    print(f'{ticker} : {price} retrieved in {time.perf_counter() - start:0.1f} seconds') 
    await q.put((ticker,price)) # place the price into the asyncio queue
    

#consumers
async def process_data(q):
    while True:
        data = await q.get()
        print(f"processing: {data}")
        with ProcessPoolExecutor() as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor,data_processor,data)
            #if output of data_processing failed,send ticker back to queue to retrieve data again
            if not result[2]: 
                print(f'{result[0]} data invalid. Retrieving again...')
                await retrieve_data(result[0],q) # add a new task
                q.task_done() # end this task
            else:
                q.task_done() # so that q.join() knows when the task is done
            
async def main(tickers):       
    q = asyncio.Queue()
    producers = [asyncio.create_task(retrieve_data(ticker,q)) for ticker in tickers]
    consumers = [asyncio.create_task(process_data(q))]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers,too. blocks until all items in the queue have been received and processed
    for c in consumers:
        c.cancel() #cancel the consumer tasks,which would otherwise hang up and wait endlessly for additional queue items to appear
    

    
'''
RUN IN JUPYTER NOTEBOOK
'''
start = time.perf_counter()
tickers = ['AAPL','AMZN','TSLA','C','F']
await main(tickers)
print(f'total elapsed time: {time.perf_counter() - start:0.2f}')

'''
RUN IN TERMINAL
'''
# if __name__ == "__main__":
#     start = time.perf_counter()
#     tickers = ['AAPL','F']
#     asyncio.run(main(tickers))
#     print(f'total elapsed time: {time.perf_counter() - start:0.2f}')

下面的 data_processor() 函数,由上面的 process_data() 调用,需要在 Jupyter notebook 的不同单元格中,或者单独的模块中(据我所知,为了避免 PicklingError)

from multiprocessing import current_process

def data_processor(data):
    ticker = data[0]
    price = data[1]
    
    print(f'Started {ticker} - {current_process().name}')
    start = time.perf_counter() # start time counter
    time.sleep(random.randint(4,5)) # mimic some random processing time
    
    # pretend we're processing the price. Let the processing outcome be invalid if the price is an odd number
    if price % 2==0:
        is_valid = True
    else:
        is_valid = False
    
    print(f"{ticker}'s price {price} validity: --{is_valid}--"
          f' Elapsed time: {time.perf_counter() - start:0.2f} seconds')
    return (ticker,price,is_valid)

问题

  1. 我没有使用 python 的多处理模块,而是使用了 concurrent.futures 的 ProcessPoolExecutor,我读到它与 asyncio (What kind of problems (if any) would there be combining asyncio with multiprocessing?) 兼容。但似乎我必须在检索执行程序调用的函数的输出 (result) 和能够并行运行多个子进程之间做出选择。使用下面的构造,子进程按顺序运行,而不是并行运行。

    with ProcessPoolExecutor() as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor,data)  
    

删除 result = await 前面的 loop.run_in_executor(executor,data) 允许并行运行多个使用者,但随后我无法从父进程收集它们的结果。为此,我需要 await。然后当然剩下的代码块会失败。

如何让这些子进程并行运行并提供输出?也许它需要一个与生产者-消费者模型不同的结构或其他东西

  1. 请求再次检索无效股票价格的部分代码有效(前提是我可以从上面获得结果),但它在调用它的子进程中运行并阻止新消费者的创建,直到请求得到满足。有没有办法解决这个问题?

    #if output of data_processing failed,send ticker back to queue to retrieve data again
    if not result[2]: 
            print(f'{result[0]} data invalid. Retrieving again...')
            await retrieve_data(result[0],q) # add a new task
            q.task_done() # end this task
        else:
            q.task_done() # so that q.join() knows when the task is done
    

解决方法

但似乎我必须在检索执行程序调用的函数的输出(结果)和能够并行运行多个子进程之间做出选择。

幸运的是,情况并非如此,您还可以使用 asyncio.gather() 一次等待多个项目。但是你从队列中一个一个地获取数据项,所以你没有一批要处理的项。最简单的解决方案是启动多个消费者。替换

# the single-element list looks suspicious anyway
consumers = [asyncio.create_task(process_data(q))]

与:

# now we have an actual list
consumers = [asyncio.create_task(process_data(q)) for _ in range(16)]

每个消费者都将等待一个单独的任务完成,但这没关系,因为您将有一整套并行工作,这正是您想要的。

此外,您可能希望将 executor 设为全局变量并且使用 with,以便所有消费者共享进程池并持续到程序。这样,消费者将重用已经生成的工作进程,而不必为从队列中接收到的每个作业生成一个新进程。 (这就是拥有进程“池”的全部意义。)在这种情况下,您可能希望在程序中不再需要执行程序的地方添加 executor.shutdown()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-