如何解决如何使用AWS Glue作业覆盖过时的分区数据?
我每天将数据一次转储到 s3://
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase",table_name = "mydata",transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
Transform0 = DynamicFrame.fromDF(ds_df1,glueContext,"Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0,connection_type = "s3",format = "parquet",connection_options = {"path": "s3://<bucket>/mydata-transformed/","partitionKeys": ["year","month"]},transformation_ctx = "DataSink0")
job.commit()
我该怎么做才能删除当月的前一天数据,并用当前作业中的数据替换?有没有办法知道,在我的示例中,源数据中的 month = 10 分区已更改,因此我可以在进行转换之前清除输出中的相同分区并输出新的数据?
谢谢。
[编辑] 因此,似乎一种解决方案是获取作业书签,然后使用CURR_LATEST_PARTITIONS值确定在处理数据之前应删除的分区。就我而言,当我处理2020/10时,CURR_LATEST_PARTITIONS为2020/09。所以我知道要删除2020/10的数据,因为如果CURR_LATEST_PARTITIONS为2020/09,则必须删除该数据。我不太喜欢这种解决方案,但是我认为它会起作用,而且我不确定我还能做什么。
解决方法
您有几种选择:
- DynamicFrameWriter尚不支持在S3中覆盖数据。相反,您可以使用Spark本机
MUSIC_FILE
。但是,对于非常大的数据集,由于单个工作人员将用于覆盖S3中的现有数据,因此效率可能会有些低下。下面是一个示例:
write()
- 在lambda函数中,可以在S3中使用特定前缀的删除数据。使用Python和boto3的示例是:
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase",table_name = "mydata",transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
ds_df1 \
.write.mode('overwrite') \
.format('parquet') \
.partitionBy('year','month') \
.save('s3://<bucket>/mydata-transformed/')
job.commit()
- 您可以使用Glue的
import boto3 s3_res = boto3.resource('s3') bucket = 'my-bucket-name' # Add any logic to derive required prefix based on year/month/day prefix = 'mydata/year=2020/month=10/' s3_res.Bucket(bucket).objects.filter(Prefix=key).delete()
从特定前缀中删除数据。链接为here。
现在在glue中存在删除S3路径或删除glue目录表的功能。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。