如何解决Ipyparallel在大量工作上失败
我不熟悉Ipyparallel,并尝试寻求帮助,但是找不到我能理解的有助于我的情况的东西。如果答案在那里-我道歉!
我正在创建一个Jupyter Notebook,可用于将二进制文件转换为ASCII文件。我将分享尽可能多的代码,但我无法提供一个可行的示例。
我感觉笔记本正在运行。转换几个文件(少于1000个)的工作符合我的预期,但是,当我增加要转换的文件的数量时,就好像什么都没有发生了,即使达到5000个也常常无法完成。
我的代码如下:
def convert_bhb_tff_parallel(p):
from pathlib import Path
from pyrosap import WLSFile,BhbFile
def read_wfh_params(wfh):
params = {}
with open(wfh) as f:
for line in f.readlines():
if line.startswith('!SHEILAPM'):
_,param,value = line.split()
params[param] = value
return params
def convert_bhb_to_tff(bhb):
"""Convert the bhb file to tff.
Ensure WLS comparison is run and a log written.
Args:
bhb (Path): Pathlib path of the bhb file.
Returns:
str: The log line
bool: if an error occured in the WLS comparison (True/False)
"""
wls = None
sup_file = sup_files.get(d[bhb.stem],None)
if not sup_file:
line = '{:<40} {:<20}'
line += ' {:<25.5} {:<20.5}'.format(0.0,0.0)
line += ' || WARNING,Support file missing.'
error = 3
else:
if sup_file_type == 'WLS':
wls = WLSFile(sup_file)
wls.read_file()
params = wls.parameters
elif sup_file_type == 'WFH':
params = read_wfh_params(sup_file)
bhb_f = BhbFile(bhb,select=select_dict)
bhb_f.read_file()
tff_path = Path(out_dir) / (sup_file.stem + '.tff')
line,error = bhb_f.convert_to_tff(tff_path,params,file_format=tff_format,map_data=column_mapping,wls=wls)
if not line:
line = '{:<40} {:<20}'
line += ' {:<25} {:<20} || ACCEPTED. BHB file converted.'.format(0.0,0.0)
line = line.format(bhb.stem,d[bhb.stem])
return line,error
errors = 0
bhb_name,sup_file_type,d,select_dict = p[:4]
bhb_files,sup_files,out_dir,tff_format,column_mapping = p[4:]
bhb = bhb_files.get(bhb_name,None)
if bhb:
line,error = convert_bhb_to_tff(bhb)
else:
line = '{:<40} {:<20}'.format(bhb_name,d[bhb_name])
line += ' {:<25.5} {:<20.5}'.format(0.0,0.0)
line += ' || WARNING,BHB file missing.'
error = 4
if error > 0:
errors = True
return line,errors
subprocess.Popen(["ipcluster","start","-n={:d}".format(num_engines)])
print('Notebook sleeping for 5 seconds in order to allow the parallel computing to start')
time.sleep(5)
log = []
full_log = []
error_log = []
error = False
rc = Client()
view = rc.load_balanced_view()
try:
for bhb_name in d.keys():
args = (bhb_name,select_dict,bhb_files,column_mapping)
out = view.apply_async(convert_bhb_tff_parallel,(args))
log.append(out)
rc.wait_interactive(log)
error = any([ar.get()[1] > 0 for ar in log])
full_log.extend([ar.get()[0] for ar in log])
error_log.extend([ar.get()[0] for ar in log if ar.get()[1] > 0])
flog_fn = Path(out_dir) / 'log.log'
elog_fn = Path(out_dir) / 'errorlog.log'
with open(flog_fn,'w') as f:
f.write('{:<40} {:<20} {:<25} {:<20} || ASSESSMENT\n\n'.format(
'BHB file name','Support file name','Wave Elevation Tolerance','Correlation factor'
))
f.write('\n'.join(full_log))
with open(elog_fn,'Correlation factor'
))
f.write('\n'.join(error_log))
if error is True:
print('Errors/Warnings was found when comparing wave data,please check the log file.')
else:
print('No errors was found during wave comparison.\n\n')
finally:
close_parallel = subprocess.Popen(["ipcluster","stop"])
是否有什么地方我做错了,因为当d
的长度增加时,它开始失败了?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。