如何解决AWS Glue作业方法pyWriteDynamicFrame不存在
我的目标是从现有目录表中读取数据框,进行一些转换并从中创建一个新表。因此,根据https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html,我使用sink.writeFrame
方法:
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_db",table_name = "table1",transformation_ctx = "datasource0")
datasource1 = datasource0.toDF().withColumn("date",current_date().cast("string"))
datasource2 = DynamicFrame.fromDF(datasource1,glueContext,"datasource2")
sink = glueContext.getSink(connection_type="s3",path="s3://my_bucket/output",enableUpdateCatalog=True)
sink.setFormat("json")
sink.setCatalogInfo(catalogDatabase='my_db',catalogTableName='table2')
sink.writeFrame(datasource2)
job.commit()
但是结果是我得到一个误导性错误,该方法pyWriteDynamicFrame不存在:
Traceback (most recent call last):
File "/tmp/test",line 39,in <module>
sink.writeFrame(datasource1)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py",line 31,in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf,callsite(),info),dynamic_frame.glue_ctx,dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1257,in __call__
answer,self.gateway_client,self.target_id,self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",line 63,in deco
return f(*a,**kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 332,in get_return_value
format(target_id,".",name,value))
py4j.protocol.Py4JError: An error occurred while calling o75.pyWriteDynamicFrame. Trace:
py4j.Py4JException: Method pyWriteDynamicFrame([class org.apache.spark.sql.Dataset,class java.lang.String,class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
版本: Spark:2.4,Python:3,Glue:2
解决方法
您可以使用Glue本机转换Map类,该类将通过对输入DynamicFrame中的所有记录应用函数来构建新的DynamicFrame。
因此,在您要导出列日期的情况下,可以使用以下代码片段来实现该目的。
from datetime import datetime
def addDate(d):
d["date"] = datetime.today()
return d
datasource1 = Map.apply(frame = datasource0,f = addDate)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。