如何解决如何正确使用Dataflow / Apache Beam wait_until_finish持续时间参数?
我在数据流运行程序上的apache-beam [gcp] == 2.19.0版本下在gcp的数据流中运行了一个批处理作业。我为这项工作创建了一个自定义模板。作业正在按预期运行,但我也想添加最大作业持续时间。 I found the duration (in milliseconds) parameter inside the wait_until_finish() method,which should be available。问题是:当模板化批处理作业的运行时间超过持续时间时,该如何自动使其停止?我不需要保留任何数据,我只希望作业在运行时间过长时停止。我已经实现了运行功能,如下所示:
def run():
opts = PipelineOptions()
user_options = opts.view_as(UserOptions)
p = beam.Pipeline(options=opts)
(p |
"Read data" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query,use_standard_sql=StaticValueProvider(bool,True))) |
"Get data" >> beam.ParDo(doStuff()) |
"Output data" >> beam.ParDo(outputData(param1=user_options.input1)) |
"Write to BQ" >> beam.io.WriteToBigQuery(
table=user_options.table_spec,schema=user_options.table_schema,write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
result = p.run()
result.wait_until_finish(duration=1800000)
解决方法
否,Dataflow在一定期限后不提供自动取消功能。 您仍然可以通过简单地将cancel()放入目标即可实现
result.wait_until_finish(duration=1800000)
if not result.is_in_terminal_state(): # if pipeline isn't finished,cancel
result.cancel()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。