如何解决Flink作业中每1:45H之后CPU负载上升
在上一个问题(here)中,我问为什么我的Flink 1.10.1作业会导致运行一小时后CPU负载开始增加,却从未下降,因此,我开始使用VisualVM监视作业,jProfiler和MemoryAnalyzer,我发现了一些错误,这些错误与不必要的对象创建,HashMaps和其他一些固定的问题有关。然后,内存消耗和CPU负载都不错,但是,CPU负载几乎每隔1:45H就开始表现出类似“ hills”的状态。
我具有与以前相同的执行图,但我也将其放在这里:
DataStream<Event> from_source = rabbitConsumer
.flatMap(new RabbitMQConsumer())
.assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator<Event> data_stream = from_source
.filter(new NullidsFilterFunction())
KeyedStream<String,Event> keyed_stream = data_stream.keyby(k->k.id);
/*one-> stateful operator: ProcessWindowFunction*/
data_stream.map(new EventCount(x))
.keyBy(k -> new Date(k.timestamp.getTime()).toString())
.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*two-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*three*/
keyed_stream.filter(new FilterFunction())
.map(new MapClass())
.addSink(new SinkFuncion());
/*four*/
pw_keyed_stream = data_stream
.filter(new FilterFunction())
.map(new MapClass())
.keyBy(k -> k.id+ new Date(k.timestamp.getTime()));
pw_keyed_stream.addSink(new SinkFuncion());
/*five-> stateful operator: ProcessWindowFunction*/
pw_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*Six-> stateful operator with 4 ConcurrentHashMap into the state: RichFlatMapFunction*/
keyed_stream.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
/*seven-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*eight-> stateful operator: RichFlatMapFunction*/
data_stream.filter(new FilterFunction())
.keyBy(k -> k.type.equals("something") ? k.one : k.two)
.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
更改:
- 从JDK8升级到JDK9(两个JDK的行为相同)
- 从CMS更改为G1收集器(两个GC的山丘行为相同)
我希望当流量下降时,负载也会下降,但是即使负载很低,这些山丘也会停留在那儿。
再次感谢。 亲切的问候!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。