Spark将时间戳从CSV转换为Parquet“本地时间”语义

如何解决Spark将时间戳从CSV转换为Parquet“本地时间”语义

考虑这个最小的Spark作业,该作业将CSV读取到DataFrame并将其写为Parquet:

val df = spark.read.format("csv").option("inferSchema",true).load(filename)
df.write.parquet("parquet_folder/")

对于输入文件中的任何时间戳列,Parquet输出将包含带有即时语义的时间戳,解释当前Spark会话/ JVM时区中源数据中的时间字符串。因此,如果我的Spark作业正在EST / EDT中运行,则“ 2020-01-01 00:00”变为“ 2020-01-01 00:00-0500”。

这意味着,除非所有Spark作业都在一个一致的时区中运行,否则我可能会有差异。

还有一个理论问题,那就是Parquet实际上并不代表我的数据。我不知道文件中的午夜是否真的是美国东部时间,PST,UTC等午夜,而且我也不在乎。

Parquet格式确实支持具有类似于java.util.LocalDateTime local time 语义的时间戳的概念-日期/时间的抽象概念,而不是特定的时间点,即不论Spark会话或JVM的时区如何,都可以一致地解释。

我想让Spark将CSV中的时间戳读取到本地时间,然后将其写入Parquet。理想情况下,我也想将其应用于日期和“无时区的时间戳”列中的Spark JDBC提取。

这有可能吗?

(注意:The Parquet format documentation解释了即时语义和本地时间语义之间的区别。)

解决方法

我遇到了同样的问题。 Spark 时间戳转换可能会令人困惑。

要使 Spark 作业针对任何主机的默认本地时区设置具有健壮性,请添加一个额外的层来临时且明确地设置 Spark 时区:

from contextlib import contextmanager
from pyspark.sql import SparkSession

@contextmanager
def spark_timezone(timezone: str):
    """Context manager to temporarily set spark timezone during context manager
    life time while preserving original timezone. This is especially
    meaningful in conjunction with casting timestamps when automatic timezone
    conversions are applied by spark.

    Please be aware that the timezone property should be adjusted during DAG
    creation and execution (including both spark transformations and actions).
    Changing the timezone while adding filter/map tasks might not be
    sufficient. Be sure to change the timezone when actually executing a spark
    action like collect/save etc.

    Parameters
    ----------
    timezone: str
        Name of the timezone (e.g. 'UTC' or 'Europe/Berlin').

    Examples
    --------
    >>> with spark_timezone("Europe/Berlin"):
    >>>     df.select(df["ts_col"].cast("timestamp")).show()

    """

    spark = get_active_spark_context()
    current = spark.conf.get("spark.sql.session.timeZone")
    spark.conf.set("spark.sql.session.timeZone",timezone)

    try:
        yield None
    finally:
        spark.conf.set("spark.sql.session.timeZone",current)


def get_active_spark_context() -> SparkSession:
    """Helper function to return the currently active spark context.

    """

    return SparkSession.builder.getOrCreate()


现在,您可以通过上下文管理器使用明确的 UTC 时区包装您的 spark.read.csv 以防止任何转换:

with spark_timezone("UTC"):
    df = spark.read.csv("path_to_file")

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-