如何解决跳过Apache Beam Pipeline中的步骤
因此,我正在构建一个Apache Beam管道,并且在跳过python SDK中的其余步骤时遇到了一些麻烦。这是我遇到麻烦的简化示例:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = API_KEY
def foo(message):
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[],**options)
with beam.Pipeline(runner,options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=my_sub))
result = (sub_message | 'foo' >> beam.Map(foo))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
因此,根据此内容:Apache Beam - skip pipeline step在Java中,如果我的函数未返回任何内容,则apache_beam应该跳过其余步骤。如果我错了,请更正我,但是在python中,它与返回None相同,因此我的pass
可以替换为return None
并且完全相同。但是,当我使用pass
或return None
运行此代码时,结果的确会转到下一步。也就是说,当它不应该打印任何内容时,它会继续打印None
,因为它应该跳过所有后续步骤。任何帮助表示赞赏。
解决方法
很有趣,我一发布这个,我就在文档中找到了答案。因此,在我提供的链接中,看起来就像我一样使用ParDo NOT地图。所以实际上它应该看起来像这样:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials
class TestFn(beam.DoFn):
def process(self,element):
print('hi')
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[],**options)
with beam.Pipeline(runner,options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=mysub))
result = (sub_message | 'foo' >> beam.ParDo(TestFn()))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。