如何解决PySpark-使用本机Spark函数将长时长毫秒转换为TimestampType
我正在使用PySpark库读取JSON文件,处理数据并写回实木复合地板文件。
传入数据的日期字段以纪元为单位,以毫秒为单位。例如,1541106106796
代表:Thursday,November 1,2018 9:01:46.796 PM
。
有效的解决方案使用Python datetime
库:
def format_datetime(ts):
return datetime.fromtimestamp(ts/1000.0)
...
get_timestamp = udf(lambda x: format_datetime(int(x)),TimestampType())
df = df.withColumn("timestamp",get_timestamp(df.ts))
是否存在仅使用本机Spark函数的解决方案?
解决方法
使用 from_unixtime
并从时间戳中提取毫秒,然后在末尾添加,最后转换为 timestamp
类型
df.show()
#+-------------+
#| ts|
#+-------------+
#|1541106106796|
#+-------------+
df.withColumn("ts1",expr('concat_ws(".",from_unixtime(substring(ts,1,length(ts)-3),"yyyy-MM-dd HH:mm:ss"),substring(ts,length(ts)-2,length(ts)))').cast("timestamp")).\
show(10,False)
#+-------------+-----------------------+
#|ts |ts1 |
#+-------------+-----------------------+
#|1541106106796|2018-11-01 16:01:46.796|
#+-------------+-----------------------+
要创建 unixtime
,请使用unix_timestamp
和regexp_extract
函数。
Example:
df.show(10,False)
#+-----------------------------------------+
#|sample |
#+-----------------------------------------+
#|Thursday,November 1,2018 9:01:46.796 PM|
#+-----------------------------------------+
df.withColumn("ts",concat_ws('',unix_timestamp(col("sample"),"E,MMMM d,yyyy hh:mm:ss.SSS a"),regexp_extract(col("sample"),"\\.(.*)\\s+",1))).\
show(10,False)
#+-----------------------------------------+-------------+
#|sample |ts |
#+-----------------------------------------+-------------+
#|Thursday,2018 9:01:46.796 PM|1541124106796|
#+-----------------------------------------+-------------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。