如何解决如何将超时设置为spark任务或map操作? 或跳过长时间运行的任务
我正在使用spark并行处理一百万个任务。例如,训练一百万个单独的模型。
我需要确保尽可能多的成功,但失败要少。
在火花中,如果只有一种模型找不到最佳解决方案,则它可能会被吊死并永远运行。在这种情况下,spark作业将永远无法完成,并且杀死该作业也无法将其他999,999型号保存到hdfs。
这个问题真的很痛苦。
我搜索了一下,但没有找到有用的东西:
-
spark.task.maxFailures
:没有失败,所以这不会生效。 -
spark.network.timeout
:没有网络问题。 -
spark.executor.heartbeatInterval
:没有亲戚。
核心训练代码,主要使用rdd.map进行训练
df1 = (df.rdd
.map(lambda r: r.asDict())
.map(lambda d: transform_data(d))
.map(lambda d: create_model(d))
.map(lambda d: model_fit(d))
.map(lambda d: pickle_model(d))
)
如何为Spark任务设置超时时间?还是有什么好东西?
解决方法
我认为这不能成为配置级别的控制器。您可能只想将其应用于Spark任务的一个子集。 SparkListener
可以提供帮助,因为您可以挂接到任务,阶段,工作级别,然后使用sparkContenxt
决定取消任务。
/**
* Called when a task starts
*/
def onTaskStart(taskStart: SparkListenerTaskStart): Unit
在上面,您可以实现超时逻辑。
使用def cancelStage(stageId: Int)
您可以从侦听器事件中获取特定的ID
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。