如何解决AWS Glue:在架构中找不到列“ column_name”
我正在尝试在AWS Glue中创建ETL作业。用例如下:当运行ETL作业后在源表之一中添加一列,并且当我们尝试重新运行etl作业时,etl作业失败,提示未找到列(在目标表中)>
如何启用ETL在目标表中创建该列。因为ETL已经有权在不存在的情况下创建表。
示例:
源表:
Table X: column_1,column_2
Table Y: column_1,column_3,column_4
ETL作业已配置为将两者合并为
Table_XY: column_1,column_2,column_4
在此之前,它完美运行。
现在,如果表Y进行了如下修改
Table Y: column_1,column_4,**column_5**
然后我重新运行搜寻器(它会检测源上的列)
然后我重新运行ETL作业,它失败并显示以下错误消息
在架构中找不到列“ column_5”
我该如何解决?
使用胶水脚本进行了更新:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
## @type: DataSource
## @args: [database = "db_source",table_name = "sourc_table_x",transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_source",transformation_ctx = "datasource0")
## @type: DataSource
## @args: [database = "db_source",table_name = "sourc_table_y",redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasource1"]
## @return: datasource1
## @inputs: []
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_source",transformation_ctx = "datasource1")
## @type: Join
## @args: [keys1 = ['column_1'],keys2 = ['column_1']]
## @return: join2
## @inputs: [frame1 = datasource0,frame2 = datasource1]
join2 = Join.apply(frame1 = datasource0,frame2 = datasource1,keys1 = ['column_1'],keys2 = ['column_1'],transformation_ctx = "join2")
## @type: ResolveChoice
## @args: [choice = "make_cols",transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = join2]
resolvechoice2 = ResolveChoice.apply(frame = join2,choice = "make_cols",transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2,transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "my-db-connection",connection_options = {"dbtable": "target_table_xy","database": "db_target"},transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3,catalog_connection = "my-db-connection",transformation_ctx = "datasink4")
job.commit()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。