如何解决FlinkRunner上的ApacheBeam无法从Kafka中读取
我正在尝试运行由本地Flink集群支持的Apache Beam,以便从Documentation for ReadFromKafka中进行介绍来从Kafka主题中消费。
代码基本上就是该管道以及Beam Examples
中所述的其他设置 with beam.Pipeline() as p:
lines = p | ReadFromKafka(
consumer_config={'bootstrap.servers': bootstrap_servers},topics=[topic],) | beam.WindowInto(beam.window.FixedWindows(1))
output = lines | beam.FlatMap(lambda x: print(x))
output | WriteToText(output)
由于我尝试在Flink上运行,因此我遵循了doc for Beam on Flink并执行了以下操作:
->我下载了flink 1.10的二进制文件,并遵循了instructions to proper setup the cluster。
我检查了服务器和任务实例的日志。两者都已正确初始化。
->使用docker启动kafka并将其暴露在端口9092中。
->在终端中执行以下操作
python example_1.py --runner FlinkRunner --topic myTopic --bootstrap_servers localhost:9092 --flink_master localhost:8081 --output output_folder
终端输出
2.23.0: Pulling from apache/beam_java_sdk Digest: sha256:3450c7953f8472c2312148a2a8324b0115fd71b3a7a01a3b017f6de69d89dfe1 Status: Image is up to date for apache/beam_java_sdk:2.23.0 docker.io/apache/beam_java_sdk:2.23.0
但是在将一些消息写入myTopic之后,终端仍然保持冻结状态,并且在输出文件夹中看不到任何内容。我检查了flink-conf.yml并给出了这两行
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
我假设作业的端口将是6123,而不是Beam文档中指定的8081,但是两个端口的行为是相同的。
我对Beam / Flink还是很陌生,所以我不确定它是否可以。到目前为止,我有两个假设,但还不太清楚如何进行调查:
- 与Beam与Flink通信以发送作业的端口有关的事物。
2。apache.beam.io.external.ReadFromKafka文档中提到的Python SDK扩展服务
Note: To use these transforms,you need to start a Java Expansion Service. Please refer to the portability documentation on how to do that. Flink Users can use the built-in Expansion Service of the Flink Runner’s Job Server. The expansion service address has to be provided when instantiating the transforms.
但是阅读可移植性文档后,它又使我回到了相同的doc for Beam on Flink。
有人可以帮帮我吗?
编辑:我正在使用Debezium Source Connector for PostgreSQL编写该主题,并看到上述行为。但是当我手动尝试该主题时,应用程序崩溃并显示以下内容
RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
解决方法
您做的一切正确; Java扩展服务不再需要手动启动(请参见latest docs)。此外,Flink在8081上提供了Web UI,但也在那里接受了作业提交,因此两个端口都可以正常工作。
似乎您可能遇到了Python的TextIO不yet support streaming的问题。
此外,还有一个复杂性,当在Flink上运行Python管道时,实际代码在docker映像中运行,因此,如果您尝试写入“本地”文件,它将是映像中的文件,而不是在您的计算机上。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。