如何在Python中提高并行循环的效率 Dask或Spark发生了什么多处理和Dask应该有帮助

如何解决如何在Python中提高并行循环的效率 Dask或Spark发生了什么多处理和Dask应该有帮助

与Matlab的parloop相比,我对Python的并行循环效率低感兴趣。 在这里,我提出一个简单的寻根问题,即在ab之间强行强制初始10 ^ 6初始猜测。

import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing

# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))

def forfunc(x0):
    q = [root(func,xi).x for xi in x0]
    q = np.array(q).T[0]
    return q

# variables os the problem
a = -3
b = 5
n = int(1e6)
x0 = np.linspace(a,b,n) # list of initial guesses

# the single-process loop
q = forfunc(x0)

# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()

单进程循环耗时1分钟26s,并行循环耗时1min 7s。在加速比为1.28的情况下,我看到了一些改进,但是在这种情况下,效率(timeloop/timeparallel/n_process)为0.32。

这里发生了什么以及如何提高效率? 我在做错什么吗?

我还尝试了两种方式使用dask.delayed

import dask

# Every call is a delayed object
q = dask.compute(*[dask.delayed(func)(xi) for xi in x0])

# Every chunk is a delayed object
q = dask.compute(*[dask.delayed(forfunc)(x0i) for x0i in np.split(x0,nc)])

在这里,两者都比单进程循环花费更多的时间。 第一次尝试的时间为3分钟,第二次尝试的时间为27秒。

解决方法

Dask(或Spark)发生了什么

通过单进程测试,您的循环可以在90秒内执行一百万个任务。因此,平均每个任务花费您的CPU大约90微秒。

在提供灵活性和弹性的分布式计算框架(如Dask或Spark)中,任务的相关开销很小。每个任务的Dask开销低至200 microseconds。 Spark 3.0 documentation建议Spark可以支持短至200 毫秒的任务,这也许意味着Dask的开销实际上比Spark少1000倍。听起来好像Dask在这里确实做得很好!

如果您的任务快于框架的每个任务的开销,则与在相同数量的机器/内核上手动分配工作相比,使用它会发现性能更差。在这种情况下,您会遇到这种情况。

在分块数据Dask示例中,您只有几个任务,因此可以减少开销,从而获得更好的性能。但是,相对于原始的多处理,您可能会因为Dask的开销而对性能造成很小的影响,或者您没有使用Dask集群并在单个进程中运行任务。

多处理(和Dask)应该有帮助

对于这种令人尴尬的并行问题,使用多处理的结果通常是出乎意料的。您可能需要确认计算机上物理内核的数量,尤其要确保没有其他事情在积极利用您的CPU内核。一无所知,我想这就是元凶。

在具有两个物理核心的笔记本电脑上,您的示例需要:

  • 单个过程循环2分钟1秒
  • 两个过程1分钟2秒
  • 四个过程1分钟
  • 1分钟5秒,用于一个带有nc=2的分块式Dask示例,该示例分为两个块,以及一个由两个工作线程和每个工作线程一个线程组成的LocalCluster。可能需要仔细检查您正在集群上运行。

通过两个进程获得大约2倍的加速与我的笔记本电脑上的预期一致,因为看到更多此进程或CPU绑定任务带来的收益很少或没有收益。与原始多重处理相比,Dask还增加了一些开销。

%%time
​
# the single-process loop
q = forfunc(x0)
CPU times: user 1min 55s,sys: 1.68 s,total: 1min 57s
Wall time: 2min 1s
%%time
​
# parallel loop
nc = 2
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 92.6 ms,sys: 70.8 ms,total: 163 ms
Wall time: 1min 2s
%%time
​
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,nc)))
pool.close()
CPU times: user 118 ms,sys: 94.6 ms,total: 212 ms
Wall time: 1min
from dask.distributed import Client,LocalCluster,wait
client = Client(n_workers=2,threads_per_worker=1)

%%time
​
nc = 2
chunks = np.split(x0,nc)
client.scatter(chunks,broadcast=True)
q = client.compute([dask.delayed(forfunc)(x0i) for x0i in chunks])
wait(q)
/Users/nickbecker/miniconda3/envs/prophet/lib/python3.7/site-packages/distributed/worker.py:3382: UserWarning: Large object of size 4.00 MB detected in task graph: 
  (array([1.000004,1.000012,1.00002,...,4.99998 ... 2,5.      ]),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func,big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func,big_future)  # good
  % (format_bytes(len(b)),s)
CPU times: user 3.67 s,sys: 324 ms,total: 4 s
Wall time: 1min 5s

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-