如何解决如何在python中的类中使用射线并行?
我想使用ray task 方法而不是ray actor 方法来并行化类中的方法。后者的原因似乎需要更改实例化类的方式(如here所示)。下面是一个玩具代码示例,以及错误
import numpy as np
import ray
class MyClass(object):
def __init__(self):
ray.init(num_cpus=4)
@ray.remote
def func(self,x,y):
return x * y
def my_func(self):
a = [1,2,3]
b = np.random.normal(0,1,10000)
result = []
# we wish to parallelise over the array `a`
for sub_array in np.array_split(a,3):
result.append(self.func.remote(sub_array,b))
return result
mc = MyClass()
mc.my_func()
>>> TypeError: missing a required argument: 'y'
由于ray似乎并不“意识到”该类而出现错误,因此它需要一个自变量self
。
如果我们不使用类,则代码可以正常工作
@ray.remote
def func(x,y):
return x * y
def my_func():
a = [1,3,4]
b = np.random.normal(0,10000)
result = []
# we wish to parallelise over the list `a`
# split `a` and send each chunk to a different processor
for sub_array in np.array_split(a,4):
result.append(func.remote(sub_array,b))
return result
res = my_func()
ray.get(res)
>>> [array([-0.41929678,-0.83227786,-2.69814232,...,-0.67379119,-0.79057845,-0.06862196]),array([-0.83859356,-1.66455572,-5.39628463,-1.34758239,-1.5811569,-0.13724391]),array([-1.25789034,-2.49683358,-8.09442695,-2.02137358,-2.37173535,-0.20586587]),array([ -1.67718712,-3.32911144,-10.79256927,-2.69516478,-3.1623138,-0.27448782])]```
我们看到输出是预期的4个数组的列表。如何使用ray使MyClass
进行并行处理?
解决方法
一些提示:
-
通常建议您仅在python中的函数或类上使用
ray.remote
装饰器(而不是绑定方法)。 -
在函数的构造函数中调用
ray.init
时应非常小心,因为ray.init
并不是幂等的(这意味着程序将在以下情况下失败)您实例化了MyClass
的多个实例。相反,您应该确保ray.init
在程序中仅运行一次。
我认为使用Ray可以达到两种效果。
您可以将func
从类中移出,这样它将成为函数而不是绑定方法。请注意,在这种方法中,MyClass
将被序列化,这意味着func
对MyClass
所做的更改将不会在函数之外的任何地方反映出来。在您的简化示例中,这似乎没有问题。这种方法最容易实现最大并行度。
@ray.remote
def func(obj,x,y):
return x * y
class MyClass(object):
def my_func(self):
...
# we wish to parallelise over the array `a`
for sub_array in np.array_split(a,3):
result.append(func.remote(self,sub_array,b))
return result
您可以考虑的另一种方法是使用async actors。在这种方法中,ray actor将通过异步处理并发,但这comes with the limitations of asyncio。
@ray.remote(num_cpus=4)
class MyClass(object):
async def func(self,y):
return x * y
def my_func(self):
a = [1,2,3]
b = np.random.normal(0,1,10000)
result = []
# we wish to parallelise over the array `a`
for sub_array in np.array_split(a,3):
result.append(self.func.remote(sub_array,b))
return result
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。