如何解决Apache Beam-Google数据流-WriteToBigQuery-Python-参数-模板-管道
我对自己的发展有2个问题。
问题1
我正在尝试通过python代码创建模板,该模板包括读取BigQuery表,应用一些转换并写入其他BigQuery表(可以存在或不存在)中。
重点是,我需要将目标表作为参数发送,但是看起来我无法在管道方法WriteToBigQuery中使用参数,因为它引发以下错误消息:apache_beam.error.RuntimeValueProviderError:RuntimeValueProvider(option: project_target,类型:str,默认值:'Test')。get()不在运行时上下文中调用
方法1
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Delete previous data" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> Write(WriteToBigQuery(
table=custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),schema=custom_options.target_schema.get(),write_disposition=BigQueryDisposition.WRITE_APPEND,create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)
方法2
我创建了一个ParDo函数,以便在那里获取变量并设置WriteToBigQuery方法。但是,尽管管道执行成功完成,并且看到输出返回了行(理论上已写入),但我看不到表或在其上插入的数据。
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Pre-tasks" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> Write(WriteToBigQuery())
我尝试了2种方法,但都无效:BigQueryBatchFileLoads和WriteToBigQuery
class writeTable(beam.DoFn):
def process(self,element):
try:
#Load first here the parameters from the custom_options variable (Here we can do it)
result1 = Write(BigQueryBatchFileLoads(destination=target_table,schema=target_schema,create_disposition=BigQueryDisposition.CREATE_IF_NEEDED))
result2 = WriteToBigQuery(table=target_table,create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,method="FILE_LOADS"
)
问题2
我还有一个疑问,就是在最后一个ParDo类中,是否需要像在最后一个管道步骤中一样返回元素,result1或result2。
感谢您的帮助。
解决方法
执行此操作的最佳方法类似于#1,但不调用get
传递值提供程序,也不传递表的lambda:
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Delete previous data" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> WriteToBigQuery(
table=lambda x: custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),schema=custom_options.target_schema,write_disposition=BigQueryDisposition.WRITE_APPEND,create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)
这应该有效。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。