如何从ray中的对象存储中清除对象?

如何解决如何从ray中的对象存储中清除对象?

我正在试用很有前途的多处理包 ray。我有一个我似乎无法解决的问题。我的程序第一次运行良好,但在第二次运行时在 ray.put() 行上引发此异常:

ObjectStoreFullError: Failed to put object ffffffffffffffffffffffffffffffffffffffff010000000c000000 in object store because it is full. Object size is 2151680255 bytes.
The local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster.

我想做什么: 在我的实际代码(我打算编写)中,我需要按顺序处理许多 big_data_objects。我想一次在内存中保存一个 big_data_object 并对大数据对象进行多次繁重(独立)计算。我想并行执行这些计算。完成这些后,我必须将对象存储中的这些 big_data_object 替换为新的并重新开始计算(并行)。

使用我的测试脚本,我通过在没有 ray.shutdown() 的情况下再次启动脚本来模拟这一点。如果我使用 ray 关闭 ray.shutdown(),对象存储会被清除,但重新初始化需要很长时间,而且我无法按照我的意愿顺序处理多个 big_data_object

我研究了哪些信息来源: 我研究了这个文档 Ray Design Patterns 并研究了“反模式:大/不可序列化对象的闭包捕获”部分以及正确的模式应该是什么样子。我还研究了导致以下测试脚本的入门指南。

重现问题的最小示例: 我创建了一个测试脚本来测试这个。是这样的:

#%% Imports
import ray
import time
import psutil
import numpy as np


#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
    ray.init(num_cpus=num_cpus,include_dashboard=False)

# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
    time.sleep(1)
    data_item = ray.get(big_data_object_ref)
    return data_item[0,0]+x

# Define large data
big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
# big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)

# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
    result_refs.append(my_function.remote(item))
    
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores,this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))


#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
for index in range(4):
    del result_refs[0]

del big_data_object_ref

我认为哪里出错了: 我想我删除了脚本末尾对对象存储的所有引用。因此,应该从对象存储中清除对象(如 here 所述)。显然,出现了问题,因为 big_data_object 保留在对象存储中。然而,results 从对象存储中删除就好了。

一些调试信息: 我使用 ray memory 命令检查了对象存储,这是我得到的:

(c:\python\cenv38rl) PS C:\WINDOWS\system32> ray memory
---------------------------------------------------------------------------------------------------------------------
 Object ID                                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================================
; worker pid=20952
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=29368
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=17388
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=24208
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=27684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=6860
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; driver pid=28684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\worker.py:put_object:277 | c:\python\cenv38rl\lib\site-packages\ray\worker.py:put:1489 | c:\python\cenv38rl\lib\site-packages\ray\_private\client_mode_hook.py:wrapper:47 | C:\Users\Stefan\Documents\Python examples\Multiprocess_Ray3_SO.py:<module>:42
---------------------------------------------------------------------------------------------------------------------

--- Aggregate object store stats across all nodes ---
Plasma memory usage 2052 MiB,1 objects,77.41% full

我尝试过的一些事情: 如果,我将 my_function 替换为:

@ray.remote
def my_function(x):  # Later I will have multiple different my_functions to extract separate feature from my big_data_objects
    time.sleep(1)
    # data_item = ray.get(big_data_object_ref)
    # return data_item[0,0]+x
    return 5

然后脚本成功清除了对象存储,但 my_function 无法使用我需要的 big_data_object

我的问题是:如何修复我的代码,以便在脚本结束时从对象存储中删除 big_data_object 而不关闭 ray?

注意:我使用 pip install ray 安装了 ray,它给了我现在使用的版本 ray==1.2.0。我在 Windows 上使用 ray,并在 Spyder v4.2.5(实际上是 miniconda)环境中的 conda 中开发,以防万一。

编辑: 我也在具有 8GB RAM 的 Ubuntu 机器上进行了测试。为此,我使用了 1GB 的 big_data_object。 我可以确认这个问题也出现在这台机器上。

ray memory 输出:

(SO_ray) stefan@stefan-HP-ZBook-15:~/Documents/Ray_test_scripts$ ray memory 
---------------------------------------------------------------------------------------------------------------------
 Object ID                                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================================
; worker pid=18593
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; worker pid=18591
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; worker pid=18590
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; driver pid=17712
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   (put object)  | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:wrapper:47 | /home/stefan/Documents/Ray_test_scripts/Multiprocess_Ray3_SO.py:<module>:43 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/spyder_kernels/customize/spydercustomize.py:exec_code:453
; worker pid=18592
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
---------------------------------------------------------------------------------------------------------------------

--- Aggregate object store stats across all nodes ---
Plasma memory usage 1026 MiB,99.69% full

我必须在 Spyder 中运行程序,以便在执行程序后我可以使用 ray memory 检查对象存储的内存。例如,如果我在 PyCharm 中运行程序,ray 会在脚本完成时自动终止,因此我无法检查我的脚本是否按预期清除了对象存储。

解决方法

问题在于您的远程函数捕获了 big_data_object_ref ,并且永远不会删除来自那里的引用。请注意,当您执行此类操作时:

# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
    time.sleep(1)
    data_item = ray.get(big_data_object_ref)
    return data_item[0,0]+x

# Define large data
big_data_object = np.random.rand(16400,16400)
big_data_object_ref = ray.put(big_data_object)

big_data_object_ref 被序列化为远程函数定义。因此,在您删除此序列化函数定义(位于 ray 内部)之前,将有一个永久指针。

改为使用这种类型的模式:

#%% Imports
import ray
import time
import psutil
import numpy as np


#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
    ray.init(num_cpus=num_cpus,include_dashboard=False)

# Define function to do work in parallel
@ray.remote
def my_function(big_data_object,x):
    time.sleep(1)
    return big_data_object[0,0]+x

# Define large data
#big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)
print("ref in a driver ",big_data_object_ref)

# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
    result_refs.append(my_function.remote(big_data_object_ref,item))
    
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores,this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))
print(result_refs)

#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
#for index in range(4):
#    del result_refs[0]
del result_refs

del big_data_object_ref
import time
time.sleep(1000)

不同之处在于现在我们将 big_data_object_ref 作为参数传递给远程函数,而不是在远程函数中捕获它。

注意:当一个对象引用被传递给一个远程函数时,它们会被自动解除引用。所以不需要在远程函数中使用 ray.get()。如果您想在远程函数内显式调用 ray.get(),请将列表或字典内的对象引用作为参数传递给远程函数。在这种情况下,您会得到类似的信息:

# Remote function
@ray.remote
def my_function(big_data_object_ref_list,x):
    time.sleep(1)
    big_data_object = ray.get(big_data_object_ref_list[0])
    return big_data_object[0,0]+x

# Calling the remote function
my_function.remote([big_data_object_ref],item)

注意 2:您使用 Spyder,它使用 IPython 控制台。 rayIPython 控制台之间目前存在一些已知问题。只需确保删除脚本中的引用,而不是使用直接输入到 IPython 控制台的命令(因为这样会删除引用,但不会从对象存储中删除项目)。要在脚本运行时使用 ray memory 命令检查对象存储,您可以在脚本末尾使用一些代码,例如:

#%% Testing ray
# ... my ray testing code

#%% Clean-up object store data
print("Wait 10 sec BEFORE deletion")
time.sleep(10)  # Now quickly use the 'ray memory' command to inspect the contents of the object store.

del result_refs
del big_data_object_ref

print("Wait 10 sec AFTER deletion")
time.sleep(10)  # Now again use the 'ray memory' command to inspect the contents of the object store.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-