如何解决Spark将时间戳从CSV转换为Parquet“本地时间”语义
考虑这个最小的Spark作业,该作业将CSV读取到DataFrame并将其写为Parquet:
val df = spark.read.format("csv").option("inferSchema",true).load(filename)
df.write.parquet("parquet_folder/")
对于输入文件中的任何时间戳列,Parquet输出将包含带有即时语义的时间戳,解释当前Spark会话/ JVM时区中源数据中的时间字符串。因此,如果我的Spark作业正在EST / EDT中运行,则“ 2020-01-01 00:00”变为“ 2020-01-01 00:00-0500”。
这意味着,除非所有Spark作业都在一个一致的时区中运行,否则我可能会有差异。
还有一个理论问题,那就是Parquet实际上并不代表我的数据。我不知道文件中的午夜是否真的是美国东部时间,PST,UTC等午夜,而且我也不在乎。
Parquet格式确实支持具有类似于java.util.LocalDateTime
的 local time 语义的时间戳的概念-日期/时间的抽象概念,而不是特定的时间点,即不论Spark会话或JVM的时区如何,都可以一致地解释。
我想让Spark将CSV中的时间戳读取到本地时间,然后将其写入Parquet。理想情况下,我也想将其应用于日期和“无时区的时间戳”列中的Spark JDBC提取。
这有可能吗?
(注意:The Parquet format documentation解释了即时语义和本地时间语义之间的区别。)
解决方法
我遇到了同样的问题。 Spark 时间戳转换可能会令人困惑。
要使 Spark 作业针对任何主机的默认本地时区设置具有健壮性,请添加一个额外的层来临时且明确地设置 Spark 时区:
from contextlib import contextmanager
from pyspark.sql import SparkSession
@contextmanager
def spark_timezone(timezone: str):
"""Context manager to temporarily set spark timezone during context manager
life time while preserving original timezone. This is especially
meaningful in conjunction with casting timestamps when automatic timezone
conversions are applied by spark.
Please be aware that the timezone property should be adjusted during DAG
creation and execution (including both spark transformations and actions).
Changing the timezone while adding filter/map tasks might not be
sufficient. Be sure to change the timezone when actually executing a spark
action like collect/save etc.
Parameters
----------
timezone: str
Name of the timezone (e.g. 'UTC' or 'Europe/Berlin').
Examples
--------
>>> with spark_timezone("Europe/Berlin"):
>>> df.select(df["ts_col"].cast("timestamp")).show()
"""
spark = get_active_spark_context()
current = spark.conf.get("spark.sql.session.timeZone")
spark.conf.set("spark.sql.session.timeZone",timezone)
try:
yield None
finally:
spark.conf.set("spark.sql.session.timeZone",current)
def get_active_spark_context() -> SparkSession:
"""Helper function to return the currently active spark context.
"""
return SparkSession.builder.getOrCreate()
现在,您可以通过上下文管理器使用明确的 UTC 时区包装您的 spark.read.csv
以防止任何转换:
with spark_timezone("UTC"):
df = spark.read.csv("path_to_file")
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。