如何解决python并行读取csv文件并连接数据框
我有一个应用程序,每个应用程序会读取50个大型csvs文件,每个文件约400MB。现在,我正在阅读这些内容以创建一个数据帧,并最终将所有这些链接成一个单一的数据帧。我想同时进行以加快整个过程。所以我的下面代码看起来像这样:
import numpy as np
import pandas as pd
from multiprocessing.pool import ThreadPool
from time import time
Class dataProvider:
def __init__(self):
self.df=pd.DataFrame()
self.pool = ThreadPool(processes=40)
self.df_abc=pd.DataFrame()
self.df_xyz=pd.DataFrame()
self.start=time()
def get_csv_data(self,filename):
return pd.read_csv(filename)
def get_all_csv_data(self,filename):
self.start=time()
df_1 = self.pool.apply_sync(self.get_csv_data,('1.csv',),callback=concatDf)
df_2 = self.pool.apply_sync(self.get_csv_data,('2.csv',callback=concatDf)
total_time=time()-self.start
def concatDf(self):
self.df_abc=pd.concat([df_1,df_2])
self.df_xyz=self.df_abc.iloc[:,1:]
return self.df_xyz
我看到以下代码问题:
- 如果我的apply_sync调用调用了相同的回调,那么我怎么知道当前的回调已被df_1行或df_2以上的哪个调用确切地调用了? 2)我想连接不同apply_sync的输出,如何在concatDf回调函数中做到这一点?
- 我如何知道所有apply_sync调用的回调已完成,以便我可以将所有50个csvs返回的串联数据帧返回?
- 有没有更好,更有效的方法来做到这一点?
谢谢
解决方法
编辑:仅在有足够的可用RAM时使用此解决方案。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import pandas as pd
from glob import glob
files = glob("*.csv")
def read_file(file):
return pd.read_csv(file)
# I would recommend to try out whether ThreadPoolExecutor or
# ProcessPoolExecutor is faster on your system:
with ThreadPoolExecutor(4) as pool:
df = pd.concat(pool.map(read_file,files))
print(df)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。