如何解决是否有更好的方法使用Python处理Spark / AWS-Glue中的PostgreSQL Hstore
我正在使用AWS Glue对带有许多动态Hstore字段的PostgreSQL中存储的数据执行ETL。我需要使用Hstore中的某些字段来执行操作。
让我介绍一下我的操作方式,以便您可以选择其他方法来帮助我或做得更好。
-
数据从“胶水目录”加载到DaynamicFrame
-
我将DynamicFrame转换为Spark DataFrame进行一些类似SQL的操作(我无法使用DyanicFrame,因为我需要一些分组,汇总和排名)
-
Glue目录不支持Hstore并将列作为字符串加载。
-
使用spark,我将Hstore字符串转换为JSON字符串,然后使用Spark from_json将列加载为地图类型。
import pyspark.sql.functions as F import pyspark.sql.types as T df = dynamicFrame.toDF() df = df.withColumn("column_from_hstore",F.concat(F.lit("{"),F.col("column"),F.lit("}"))) df = df.withColumn("column_json",F.regexp_replace(F.col("column_from_hstore"),'=>',":")) df = df.withColumn("column_map",F.from_json(F.col("column_json"),T.MapType(T.StringType(),T.StringType()))) df = df.withColumn("column_child",F.col("column_map.child").cast('int'))
最后,我可以使用子字段。
我有其他选择
我可以将spark直接连接到Postgres并使用原始SQL加载数据并选择所需的字段,但这需要我在Glue上管理JDBC连接凭据。我找不到简单的方法。
问题
由于我是新手,所以我不知道这样做的效果如何,或者是否有更好的方法。感谢您的帮助
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。