如何解决没有中间累加器的异步 get()?
我有以下循环。我简化了代码。 inner() 在类似的循环中解析同一个文件,当然没有 .remote() 调用
def outer(self,file):
rv = []
with open(file,'r') as f :
acc1,acc2 = [],[]
for i,line in enumerate(f) :
if i % 10 == 0 : print(f'> {i} ',end="\n")
if i > 25 : break
outeri,txt = line.split(':')
abc = ClassX.inner.remote(txt,file,int(outeri))
acc2.append(abc) #lst of obj-refs
acc1.append(int(outeri))
rv = [z for z in zip(acc1,ray.get([a for a in acc2])) ]
return rv
我想将数据异步收集到 rv 中,我在这里这样做,但没有中介“acc2”。
我有两个问题:
-
相反/另外-收集数据我想异步执行一些 SQL 代码,但随着结果的出现。
-
print() 进度不是逐步打印,而是在最后一次打印。我必须把它移到“inner()”
试图理解并行迭代器,但似乎很难/不可行,如何在 readline 之后压缩步骤直到 .remote() 调用
解决方法
编辑:根据说明更改答案。
为了按照对象引用到达的顺序处理对象引用,您需要使用 ray.wait
在对象引用返回时获取对象引用,然后仅在对象引用上调用 ray.get
准备好了。
def outer(file):
outer_is = {}
unfinished_refs = []
with open(file,"r"):
for i,line in enumerate(f):
outeri,txt = line.split(":")
ref = ClassX.inner.remote(txt,file,int(outeri))
outer_is[ref] = int(outeri)
unfinished_refs.append(ref)
# This part will get and process tasks as they finish
while len(unfinished_refs) > 0:
finished,unfinished_refs = ray.wait(unfinished_refs)
outeri = outer_is[finished[0]]
result = ray.get(finished[0])
### Process the result here ###
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。