如何解决AWS Glue作业插入日期时间在redshift中为空
AWS Glue的新手。 我正在尝试通过Glue作业插入redshift表,该作业具有S3搜寻器来读取csv文件和redshift映射的搜寻器以用于表方案。
以下作业试图运行S3的create_date插入时间戳的redshift列中。值始终为空。
胶水作业:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir,JOB_NAME]
args = getResolvedOptions(sys.argv,['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
## @type: DataSource
## @args: [database = "salesdatabase",table_name = "sales_timestamp_csv",transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "salesdatabase",transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("city","string","city","string"),("country","country",("amount","amount",("create_date","create_date","timestamp")],transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0,mappings = [("city",transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["country","create_date"],transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1,paths = ["country",transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG",database = "salesdatabase",table_name = "metricsdb_public_sales_csv",transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2,choice = "MATCH_CATALOG",transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols",transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3,choice = "make_cols",transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "salesdatabase",redshift_tmp_dir = TempDir,transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4,redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink5")
job.commit()
搜寻器架构详细信息:
搜寻器的S3模式
列名数据类型
- 城市字符串
- 国家/地区字符串
- 数量字符串
- create_date字符串
来自搜寻器的表架构
列名数据类型
- 城市字符串
- 国家/地区字符串
- 数量字符串
- create_date时间戳
请使用任何指针
解决方法
I could resolve this.
Just convert Dynamic Frame to Spark Data Frame and apply transformation.
from pyspark.sql.functions import to_timestamp,col
from awsglue.dynamicframe import DynamicFrame
#Convert dynamic frame to data frame to use standard pyspark functions
data_frame = datasource0.toDF()
data_frame.show()
data_frame = data_frame.withColumn("created_date",to_timestamp(col("created_date"),"dd/MM/yy HH:mm"))
data_frame.show()
#Convert back to the dynamic frame
dynamic_frame_write = DynamicFrame.fromDF(data_frame,glueContext,'dynamic_frame_write')
Just add this frame to datasink5
It worked for me
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。