如何解决kafka flink时间戳事件时间和水印
我正在阅读《使用Apache Flink进行流处理》一书,并指出“从0.10.0版本开始,Kafka支持消息时间戳。从Kafka 0.10或更高版本读取时,如果应用程序以事件时间模式运行,那么使用者将自动将消息时间戳提取为事件时间时间戳*”
因此,在processElement
函数内部,调用context.timestamp()
默认会返回kafka消息时间戳吗?
您可以请库尔提供一个简单的示例,说明如何实现AssignerWithPeriodicWatermarks / AssignerWithPunctuatedWatermarks,以根据消耗的kafka消息时间戳提取(并构建水印)。
如果我使用的是TimeCharacteristic.ProcessingTime
,则ctx.timestamp()会返回处理时间,在这种情况下,它将类似于context.timerService().currentProcessingTime()
。
谢谢。
解决方法
Flink Kafka消费者会为您解决此问题,并将时间戳记放置在需要的地方。在Flink 1.11中,您可以简单地依靠它,尽管您仍然需要提供指定乱序(或断言时间戳是有序的)的WatermarkStrategy:
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.assignTimestampsAndWatermarks(
WatermarkStrategy.
.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
在早期版本的Flink中,您必须提供时间戳分配器的实现,如下所示:
public long extractTimestamp(Long element,long previousElementTimestamp) {
return previousElementTimestamp;
}
此版本的extractTimestamp
方法以previousElementTimestamp
的形式传递到StreamRecord中的时间戳的当前值,在这种情况下,它将是Flink Kafka使用者放置在那里的时间戳。
Flink 1.11 docs
Flink 1.10 docs
对于使用ctx.timestamp()
时TimeCharacteristic.ProcessingTime
返回的内容,在这种情况下,此方法返回NULL。 (从名义上讲,是的,好像时间戳是当前的处理时间,但这不是它的实现方式。)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。