如何解决使用DataProcPySparkOperator时无法配置GCP项目
我正在使用Cloud Composer环境在GCP项目中运行工作流程。我的工作流程之一是使用DataprocClusterCreateOperator
在不同的项目中创建一个Dataproc集群,然后尝试使用DataProcPySparkOperator
模块中的airflow.contrib.operators.dataproc_operator
将PySpark作业提交到该集群。
要创建集群,我可以指定一个project_id
参数在另一个项目中创建它,但是似乎DataProcPySparkOperator
会忽略此参数。例如,我希望能够传递project_id
,但是当任务运行时,我最终会遇到404
错误:
from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator
t1 = DataProcPySparkOperator(
project_id='my-gcp-project',main='...',arguments=[...],)
如何使用DataProcPySparkOperator
提交另一个项目中的工作?
解决方法
DataProcPySparkOperator
模块中的airflow.contrib.operators.dataproc_operator
在其构造函数中不接受project_id
kwarg,因此它将始终默认在Cloud Composer环境所在的项目中提交Dataproc作业。如果传递了参数,则将其忽略,这将导致在运行任务时出现404错误,因为操作员将尝试使用错误的群集路径来轮询作业。
一种解决方法是复制运算符和挂钩,然后对其进行修改以接受项目ID。但是,一个更简单的解决方案是使用airflow.providers
软件包中较新的运算符(如果您使用的是支持它们的Airflow版本),因为在较新的Airflow版本中不推荐使用许多airflow.contrib
运算符。
下面是一个示例。请注意,此模块中有一个更新的DataprocSubmitPySparkJobOperator
,但不推荐使用DataprocSubmitJobOperator
。因此,您应该使用后者,它接受项目ID。
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
t1 = DataprocSubmitJobOperator(
project_id='my-gcp-project-id',location='us-central1',job={...},)
如果您使用Composer 1.10.5 +,Airflow 1.10.6+和Python 3运行环境,则将预先安装提供程序,并可以立即使用它们。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。