如何解决Dask Dataframe GroupBy.size返回memoryError
我有两个大的CSV文件,每个文件约2800万行。我正在执行内部联接,在新的Dask Dataframe中添加列,然后在某些列上请求GroupBy.Size()
以返回计数。在此示例中,输入来自两个拼花文件,这两个拼花文件是从原始CSV生成的。
端到端程序确实可以在8核/ 32GB Ram计算机上运行,并产生groupBy Size的4x6熊猫DF,但是在16GB和10GB RAM设备上运行时,出现内存错误。
我该怎么做才能避免出现此内存错误?
这是有问题的代码:
def merge(ubs_dd,br_dd):
return dd.merge(ubs_dd,br_dd,left_on='mabid',right_on='brid',how='inner',suffixes=('_ubs','_br'),) # slow
#return dd.merge(ubs_dd,left_index=True,right_index=True) # fast
def reconcile(merged_dd):
merged_dd['amount_different'] = merged_dd['AMOUNT_ubs'].astype(float) - merged_dd['AMOUNT_br'].astype(float)
merged_dd['amount_break'] = merged_dd['amount_different'].abs() >= 1 #+/- $1 tolerance
merged_dd['billable_break'] = merged_dd['BILLABLE_ubs'] == merged_dd['BILLABLE_br']
merged_dd['eligible_break'] = merged_dd['ELIGIBLE_ubs'] == merged_dd['ELIGIBLE_br']
return merged_dd
def metrics_report(merged_dd):
return merged_dd.groupby(['amount_break','billable_break','eligible_break']).size().reset_index().rename(columns={0:'count'}).compute()
merged_dd = merge(ubs_dd,br_dd)
merged_dd = reconcile(merged_dd)
metrics = metrics_report(merged_dd)
在低内存设备上运行时,这是70%完成后收到的错误:
generating final outputs
[############################ ] | 70% Completed | 29min 19.5s
Traceback (most recent call last):
File "c:/Users/<>/git/repository/<>/wma_billing_rec.py",line 155,in <module>
metrics = metrics_report(merged_dd)
File "c:/Users/<>/git/repository/<>/wma_billing_rec.py",line 115,in metrics_report
return merged_dd.groupby(['amount_break','eligible_break']).size().reset_index().rename(columns={0:'count'}).compute()
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\base.py",line 167,in compute
(result,) = compute(self,traverse=False,**kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\base.py",line 452,in compute
results = schedule(dsk,keys,**kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\threaded.py",line 84,in get
**kwargs
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py",line 486,in get_async
raise_exception(exc,tb)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py",line 316,in reraise
raise exc
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py",line 222,in execute_task
result = _execute_task(task,data)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\core.py",line 121,in _execute_task
return func(*(_execute_task(a,cache) for a in args))
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\dataframe\shuffle.py",line 780,in collect
res = p.get(part)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\core.py",line 73,in get
return self.get([keys],**kwargs)[0]
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\core.py",line 79,in get
return self._get(keys,**kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\encode.py",line 28,in _get
raw = self.partd._get(keys,**kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\buffer.py",line 54,in _get
self.slow.get(keys,lock=False)))
MemoryError
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。