如何解决将CSV读入AWS Glue并与数据目录中的数据合并
我刚接触AWS Glue,想了解如何执行以下操作:
- 从AWS Glue中的URL提取CSV文件
- 将数据集与我在“数据目录”中具有的表中的列连接起来。
- 将此作为新表写回到“数据目录”。
到目前为止,我有这个:
tableA_DF = pandas.read_cv("https://example.com/file.csv")
tableB = glueContext.create_dynamic_frame.from_catalog(database=Z,table_name = Y)
tableB_DF = TableB.toDF()
但是,我不确定如何将两者结合在一起。只需将tableB的一列添加到tableA,然后将结果存储在Data目录中。
解决方法
您可以使用create_dynamic_frame_from_options
从S3直接读入Dynamicframe。
Lookupdata = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths":[InputLookupDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'})
然后使用如下所示从目录中读取数据:
datasource0 = GlueContext.create_dynamic_frame.from_catalog(database = Glue_catalog_database,table_name = Glue_table_name,transformation_ctx = "datasource0")
然后,您可以使用join连接两个框架,然后将其写回到目录中。
[joined_dyf][1] = Join.apply(Lookupdata,Join.apply(datasource0,Lookupdata,'someidfrom_datasource0','someidfrom_Lookupdata')
现在将其写回目录,如表this所示,以获取更多信息:
sink = glueContext.getSink(connection_type="s3",path="s3://path/to/data",enableUpdateCatalog=True,updateBehavior="UPDATE_IN_DATABASE",partitionKeys=["partition_key0","partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase=<dst_db_name>,catalogTableName=<dst_tbl_name>)
sink.writeFrame(last_transform)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。