如何解决如何为每个传入元素实现带有动态超时的flink countTriggerWithTimeout
flink流处理非常新。这是我的要求: 在最近20秒内收到2个或更多元素时提醒用户。如果在20秒内收到少于2个元素,则不会发出警报,只需重新启动计数和时间即可。 每个元素的计数和间隔都不同。
这是我的代码:
dataStream
.keyBy("id")
.window(EventTimeSessionWindows.withDynamicGap((event) -> event.getThresholdInterval()))
.trigger(new CountTriggerWithTimeout<TimeWindow>())
TriggerCode:
public class CountTriggerWithTimeout<W extends TimeWindow> extends Trigger<SystemEvent,W> {
private ReducingStateDescriptor<Long> countState =
new ReducingStateDescriptor<Long>("count",new Sum(),LongSerializer.INSTANCE);
private ReducingStateDescriptor<Long> processedState =
new ReducingStateDescriptor<Long>("processed",LongSerializer.INSTANCE);
@Override
public TriggerResult onElement(SystemEvent element,long timestamp,W window,TriggerContext ctx)
throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(countState);
ReducingState<Long> processed = ctx.getPartitionedState(processedState);
count.add(1L);
processed.add(0L);
if (count.get() >= element.getThresholdCount() && processed.get() == 0) {
processed.add(1L);
return TriggerResult.FIRE_AND_PURGE;
}
if (timestamp >= window.getEnd()) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time,TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time,TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window,TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countState).clear();
ctx.getPartitionedState(processedState).clear();
}
@Override
public boolean canMerge() {
return true;
}
class Sum implements ReduceFunction<java.lang.Long> {
@Override
public Long reduce(Long value1,Long value2) throws Exception {
return value1 + value2;
}
}
}
我使用时较早
dataStream
.timeWindow(Time.seconds(1))
.trigger(new CountTriggerWithTimeout<TimeWindow>())
一切正常。由于需要从元素读取窗口时间,因此我开始使用EventTimeSessionWindow并在触发器中添加了canMerge()函数。从那以后,没有任何工作。 clear()永远不会被调用,onProcessingTime()和onEventTime()也不会被调用。我看到时间戳始终设置为相同的值,而与何时接收元素无关。
我的要求是在event.getThresholdInterval()中的计数> =阈值时“触发并清除”。如果count 请帮助我解决此问题。 谢谢...
解决方法
为什么不使用20秒的简单 Tumbling Windows 并计算其中的元素:
source
.keyBy("id")
.timeWindow(Time.seconds(20))
.process(new ProcessWindowFunction<Tuple2<String,Integer>,String,Tuple,TimeWindow>() {
@Override
public void process(Tuple key,ProcessWindowFunction<Tuple2<String,TimeWindow>.Context ctx,Iterable<Tuple2<String,Integer>> in,Collector<String> out) throws Exception {
if (Lists.newArrayList(in).size() >= 2) {
out.collect("Two or more elements between "
+ Instant.ofEpochMilli(ctx.window().getStart())
+ " " + Instant.ofEpochMilli(ctx.window().getEnd()));
}
}
})
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。