如何解决如何在Flink中获取和修改Jobgraph?
我想修改jobGraph,例如getVertices(),ColocationGroup()
,并尝试执行以下操作:
val s = StreamExecutionEnvironment.getExecutionEnvironment
val p = new Param()
s.addSource(new MySource(p))
.map(new MyMap(p))
.addSink(new MySink(p))
val g = s.getStreamGraph.getJobGraph()
val v = g.getVertices()
s.executeAsync()
但是在调试模式下,顶点不是我的MySource
运算符,并且我很困惑getStreamGraph
可以接受JobID
,在作业开始之前怎么会有id。 。,所以我想也许我应该在execute
之后得到它,然后将代码更改为following,
val s = StreamExecutionEnvironment.getExecutionEnvironment
val p = new Param()
s.addSource(new MySource(p))
.map(new MyMap(p))
.addSink(new MySink(p))
s.executeAsync()
val g = s.getStreamGraph.getJobGraph()
val v = g.getVertices()
但是代码陷入了s.getStreamGraph.getJobGraph()
如何实际获得工作图?...
解决方法
无法修改JobGraph。各种API构造了JobGraph,然后将其传递给JobManager,后者将其转换为执行图,然后对其进行调度并在任务管理器提供的任务槽中运行。建立作业后,没有API允许您修改作业的拓扑。 (这有一天会得到支持,这是不可想象的,但现在不可能了。)
如果您只想查看它的表示,System.out.println(env.getExecutionPlan())
很有趣。
如果您希望从静态DAG中获得更多动力,那么Stateful Functions API会更加灵活。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。