如何解决Flink:将收回流写入kafka sink
任何人都可以分享工作示例以将收回流写入 kafka 接收器吗?
我尝试了以下方法但不起作用。
DataStream<Tuple2<Boolean,User>> resultStream =
tEnv.toRetractStream(result,User.class);
resultStream.addsink(new FlinkKafkaProducer(OutputTopic,new ObjSerializationSchema(OutputTopic),props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
解决方法
通常最简单的解决方案是这样做:
resultStream.map(elem -> elem.f1)
这将允许您将 User
对象写入 Kafka。
但是从业务的角度来看,这并不是那么简单,或者至少取决于用例。 Kafka 是一个 append-only 日志,retract 流代表 ADD、UPDATE 和 DELETE 操作。所以,虽然上面的解决方案允许你将数据写入 Kafka,但 Kafka 中的结果可能无法正确代表实际的计算结果,因为它们不会代表更新和删除操作。
为了能够将实际正确的计算结果写入 Kafka 您可以尝试执行以下操作之一:
- 如果您知道您的用例永远不会导致任何 DELETE 或 UPDATE 操作,那么您可以安全地使用上述解决方案。
- 如果重复可能只在一些固定的时间间隔内产生(例如记录可能只在它们产生后 1 小时后更新/删除),您可能需要使用 windows 聚合所有更新并将一个最终结果写入 Kafka .
- 最后,您可以扩展
User
类以添加一个字段,该字段标记此记录是否为收回操作,并在将数据写入 Kafka 主题时保留该信息。这意味着您必须在下游处理所有可能的 UPDATE 或 DELETE 操作(在此数据的使用者中)。
最简单的解决方案是使用 upsert-kafka 连接器作为桌面接收器。这旨在消耗一个收回流并将其写入 kafka。
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/upsert-kafka.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。