如何使用AWS Glue作业覆盖过时的分区数据?

如何解决如何使用AWS Glue作业覆盖过时的分区数据?

我每天将数据一次转储到 s3:// /mydata/year=*/month=*/*.snappy.parquet 作为该月的累积数据。我有一个用于对其进行爬网以更新 mydata 表的爬网程序,以及一个CW规则,该规则在爬网程序成功后会调用lambda,从而启动Glue作业以转换列并将其输出到 s3:// / mydata-transformed / year = * / month = * / *。snappy.parquet 。此流程基本上有效。但是,我目前遇到的问题是输出数据被累加写入,而不是替换那里的数据(因为它是该月的累积数据)。例如,假设在2020年10月1日午夜,将10/1的数据转储到 s3:// /mydata/year=2020/month=10/*.snappy.parquet 。该流将在 s3:// /mydata-transformed/year=2020/month=10/*.snappy.parquet 中生成转换后的数据,所有数据都适合10/1数据。但是,第二天将10/1和10/2的数据转储到 s3:// /mydata/year=2020/month=10/*.snappy.parquet 中(覆盖前一天的文件),Glue作业将在输出文件夹中生成附加数据,即它将包含昨天运行的数据以及今天运行的数据(因此10/1数据两次,而10/2数据)。第二天是10/1数据3X,10/2数据2X和10/3数据。等等。可以确定2020/09及之前的数据,因为它们没有变化。下面是我的代码的基本结构,其中删除了样板代码,并由人为的代码替换了真实的转换。

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase",table_name = "mydata",transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
Transform0 = DynamicFrame.fromDF(ds_df1,glueContext,"Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0,connection_type = "s3",format = "parquet",connection_options = {"path": "s3://<bucket>/mydata-transformed/","partitionKeys": ["year","month"]},transformation_ctx = "DataSink0")
job.commit()

我该怎么做才能删除当月的前一天数据,并用当前作业中的数据替换?有没有办法知道,在我的示例中,源数据中的 month = 10 分区已更改,因此我可以在进行转换之前清除输出中的相同分区并输出新的数据?

谢谢。

[编辑] 因此,似乎一种解决方案是获取作业书签,然后使用CURR_LATEST_PARTITIONS值确定在处理数据之前应删除的分区。就我而言,当我处理2020/10时,CURR_LATEST_PARTITIONS为2020/09。所以我知道要删除2020/10的数据,因为如果CURR_LATEST_PARTITIONS为2020/09,则必须删除该数据。我不太喜欢这种解决方案,但是我认为它会起作用,而且我不确定我还能做什么。

解决方法

您有几种选择:

  1. DynamicFrameWriter尚不支持在S3中覆盖数据。相反,您可以使用Spark本机MUSIC_FILE。但是,对于非常大的数据集,由于单个工作人员将用于覆盖S3中的现有数据,因此效率可能会有些低下。下面是一个示例:
write()
  1. 在lambda函数中,可以在S3中使用特定前缀的删除数据。使用Python和boto3的示例是:
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'],args)
    
    DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase",table_name = "mydata",transformation_ctx = "DataSource0")
    ds_df = DataSource0.toDF()
    ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
    ds_df1 \
        .write.mode('overwrite') \
        .format('parquet') \
        .partitionBy('year','month') \
        .save('s3://<bucket>/mydata-transformed/')
    
    job.commit()
  1. 您可以使用Glue的 import boto3 s3_res = boto3.resource('s3') bucket = 'my-bucket-name' # Add any logic to derive required prefix based on year/month/day prefix = 'mydata/year=2020/month=10/' s3_res.Bucket(bucket).objects.filter(Prefix=key).delete() 从特定前缀中删除数据。链接为here
,

现在在glue中存在删除S3路径或删除glue目录表的功能。

AWS GLue doc

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-