如何解决如何使用flink CEP匹配前10秒的数据?
输入数据为:
Arrays.asList(
Tuple3.of("1",100L,0),Tuple3.of("1",300L,500L,1),600L,750L,900L,1100L,1300L,2000L,4000L,6000L,8000L,10000L,10800L,10900L,11000L,11300L,0)
)
我想匹配tuple3.f2等于1的前10秒数据,期望结果是(1,500,1)到(1,10000,1),但是程序结果是:
[(1,(1,600,750,900,1100,1)]
Flink CEP模式:(Flink版本:1.11.1)
Pattern<Tuple3<String,Long,Integer>,Tuple3<String,Integer>> headSlicePattern =
Pattern.<Tuple3<String,Integer>>begin("up",AfterMatchSkipStrategy.skipPastLastEvent())
.where(new SimpleCondition<Tuple3<String,Integer>>() {
private static final long serialVersionUID = 3426292175809665737L;
@Override
public boolean filter(Tuple3<String,Integer> tuple3) throws Exception {
return tuple3.f2.equals(0);
}
})
.next("middle")
.where(new IterativeCondition<Tuple3<String,Integer>>() {
private static final long serialVersionUID = -3851397850893872739L;
@Override
public boolean filter(Tuple3<String,Integer> value,Context<Tuple3<String,Integer>> ctx) throws Exception {
return value.f2.equals(1);
}
}).timesOrMore(5).consecutive().within(Time.seconds(10));
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。