如何解决使用 multiprocessing 或 concurrent.futures 将数据帧中的多列传递到函数中
问题:如何使用 multiprocessing
或 concurrent.futures
将数据框的列传递给每行的函数?
详情:
对于 df
中的每一行,我想将其列 leader
和 years
传递给函数 print_sentences()
。我想以并行方式使用该函数,其中每一行都是异步打印的。例如,我想使用 concurrent.futures.Executor.map
。
它需要在 Python 3.6.
Reprex:我的实际问题在计算上要求很高,所以这里是一个简化的 reprex
:
import pandas as pd
import numpy as np
import concurrent.futures
df = pd.DataFrame(np.array([["Larry",3,"Germany"],["Jerry",5,"Sweden"],["George",12,"UK"]]),columns=['leader','years','score'])
def print_sentences(df):
print(df["leader"] + " has been leader for " + df["years"] + " years")
print_sentences(df)
背景:
与此问题相关的其他问题似乎与 dataframe
以外的对象类型有关。
当我读入数据帧 .csv
时,我的具体问题就开始了。我想将此数据框的每一行的列传递给某个函数。我的实际功能(对 reprex 进行了极大简化)在计算上要求很高。它抓取数据并将其保存到 .json
。因此,每一行都充当不同的查询(例如,输入不同的领导者的姓名和分数)。
为了优化这一点,我希望行以并行方式映射到函数中。
我用上面的 reprex
简化了我的问题。
提前感谢您的帮助。
解决方法
试试这个,
#Edits 反映您的用例。
import pandas as pd
import numpy as np
from multiprocessing import cpu_count,Pool
cores = cpu_count() #Number of CPU cores on your system
partitions = cores #Define as many partitions as you want
def parallelize(data,func):
data_split = np.array_split(data,partitions)
pool = Pool(cores)
data = pd.concat(pool.map(func,data_split))
pool.close()
pool.join()
return data
def print_sentences(cols):
leader,years = cols[0],cols[1]
print(leader + " has been leader for " + years + " years")
df = pd.DataFrame(np.array([["Larry",3,"Germany"],["Jerry",5,"Sweden"],["George",12,"UK"]]),columns=['leader','years','score'])
data = df.copy()
data = parallelize(data,print_sentences)
data.apply(print_sentences,axis=1)