如何解决达斯克杀害工人,同时读取并保存大的.csv文件
我大约有1.5 TB的数据分为大约5500个json文件,我需要使用map_partition处理(NN搜索)并保存结果。 (GCS)。 每个.json文件的大小在100-400 MB之间。 另外,我使用一个集群,每个集群有250个CPU,每个处理器有16个内核,每个内存有60 GB。因此,整体内存限制约为14 TB。 但是,其中一名工人总是死于某种我无法理解的原因。
distributed.scheduler.KilledWorker: ("('bag-from-delayed-loads-to_dataframe-aa638adfe0c457d6aa064850d188edc1',1645)",<Worker 'tcp://10.0.8.221:42137',name: tcp://10.0.8.221:42137,memory: 0,processing: 15>)
即使我运行下面的朴素代码块来简单地加载json数据,将其转换为dataframe并将其保存为csv,我也会遇到相同的问题。 但是,对于较小的数据集,效果很好。
data = db.read_text("gs://path-to-input-files/*").map(json.loads)
# data = data.repartition(npartitions=10000)
data = data.to_dataframe(meta={"a": object,"b": np.int32,"c": object})
data.to_csv("gs://path-to-output-files/*.csv,index=False)
Traceback (most recent call last):
File "run_search.py",line 69,in <module>
nn_search.run(client)
File "/github_consumer-edge_mid-match-inference/mid_match/k_nn_search.py",line 92,in run
data.to_csv(
File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py",line 1408,in to_csv
return to_csv(self,filename,**kwargs)
File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/csv.py",line 892,in to_csv
delayed(values).compute(**compute_kwargs)
File "/opt/conda/lib/python3.8/site-packages/dask/base.py",line 166,in compute
(result,) = compute(self,traverse=False,**kwargs)
File "/opt/conda/lib/python3.8/site-packages/dask/base.py",line 444,in compute
results = schedule(dsk,keys,**kwargs)
File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 2682,in get
results = self.gather(packed,asynchronous=asynchronous,direct=direct)
File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 1976,in gather
return self.sync(
File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 831,in sync
return sync(
File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py",line 339,in sync
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py",line 323,in f
result[0] = yield future
File "/opt/conda/lib/python3.8/site-packages/tornado/gen.py",line 735,in run
value = future.result()
File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 1841,in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('bag-from-delayed-loads-to_dataframe-aa638adfe0c457d6aa064850d188edc1',processing: 15>)
这是仪表板的视频。 https://drive.google.com/file/d/1Ywbn2lLEBkRi5a0iD7ZJ1Hx2GNUKfgjG/view?usp=sharing
随着时间的推移,存储的字节数(从仪表板-左上方的直方图)一直在增加,这就是为什么我认为其中一个工作程序内存不足的原因。
从仪表板上看,读取json并将其转换为dask数据帧似乎比保存.csv并上传到gcs快得多。
- 是read_text()读取数据的速度快于工作人员释放内存的速度吗?
- 我是否应该等到对特定分区执行data.to_csv()之后,工人才能读取更多数据?如果是这样,那我应该怎么做呢?诸如dask.distributed.wait之类的东西?
- 我尝试将数据帧重新划分为更多的分区,例如10K甚至50K(假设重新分区将导致较小的数据大小),但仍然无济于事。
- 我在这里想念什么?有人可以帮我指出一些对我有帮助的文章或文件吗?
PS:我是Dask和分布式计算的新手。
引用:What do KilledWorker exceptions mean in Dask?
谢谢
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。