如何进行SparkSQL与Hive metastore Parquet转换的分析

如何进行SparkSQL与Hive metastore Parquet转换的分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

Spark SQL为了更好的性能,在读写Hive metastore parquet格式的表时,会默认使用自己的Parquet SerDe,而不是采用Hive的SerDe进行序列化和反序列化。该行为可以通过配置参数spark.sql.hive.convertMetastoreParquet进行控制,默认true。

这里从表schema的处理角度而言,就必须注意Hive和Parquet兼容性,主要有两个区别:

1.Hive是大小写敏感的,但Parquet相反

2.Hive会将所有列视为nullable,但是nullability在parquet里有独特的意义

由于上面的原因,在将Hive metastore parquet转化为Spark SQL parquet时,需要兼容处理一下Hive和Parquet的schema,即需要对二者的结构进行一致化。主要处理规则是:

1.有相同名字的字段必须要有相同的数据类型,忽略nullability。兼容处理的字段应该保持Parquet侧的数据类型,这样就可以处理到nullability类型了(空值问题)

2.兼容处理的schema应只包含在Hive元数据里的schema信息,主要体现在以下两个方面:

(1)只出现在Parquet schema的字段会被忽略

(2)只出现在Hive元数据里的字段将会被视为nullable,并处理到兼容后的schema中
关于schema(或者说元数据metastore),Spark SQL在处理Parquet表时,同样为了更好的性能,会缓存Parquet的元数据信息。此时,如果我们直接通过Hive或者其他工具对该Parquet表进行修改导致了元数据的变化,那么Spark SQL缓存的元数据并不能同步更新,此时需要手动刷新Spark SQL缓存的元数据,来确保元数据的一致性,方式如下:
// 第一种方式应用的比较多1. sparkSession.catalog.refreshTable(s"${dbName.tableName}")2. sparkSession.catalog.refreshByPath(s"${path}")

最后说一下最近后台小伙伴在生产中遇到的一个问题,大家如果在业务处理中遇到类似的问题,提供一个思路。

在说问题之前首先了解一个参数spark.sql.parquet.writeLegacyFormat(默认false)的作用:

设置为true时,数据会以Spark1.4和更早的版本的格式写入。比如decimal类型的值会被以Apache Parquet的fixed-length byte array格式写出,该格式是其他系统例如Hive、Impala等使用的。            
设置为false时,会使用parquet的新版格式。例如,decimals会以int-based格式写出。如果Spark SQL要以Parquet输出并且结果会被不支持新格式的其他系统使用的话,需要设置为true。  

比如,对于decimal数据类型的兼容处理,不设置true时,经常会报类似如下的错误:

Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://hadoop/data/test_decimal/dt=20200515000000/part-00000-9820eba2-8a40-446d-8c28-37027a1b1f2d-c000.snappy.parquet  at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)  at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)  at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122)  at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)  at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)...  Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary  at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)...

此时我们需要将spark.sql.parquet.writeLegacyFormat设置为true来解决上述的异常问题。

但如果同时设置spark.sql.hive.convertMetastoreParquet为false时,要注意一些数据类型以及精度的处理,比如对于decimal类型的处理。通过一个例子复原一下当时的场景:

1.创建Hive外部表testdb.test_decimal,其中字段fee_rate为decimal(10,10)

CREATE EXTERNAL TABLE `testdb`.`test_decimal`(`no` STRING ,            `fee_rate` DECIMAL(10,10)) PARTITIONED BY (`dt` STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://hadoop/data/test_decimal' TBLPROPERTIES ( 'transient_lastDdlTime' = '1589160440' ) ;

2.将testdb.item中的数据处理后保存到testdb.test_decimal中

// 这里为了展示方便,直接查询testdb.item中的数据// 注意: 字段fee_rate的类型为decimal(10,6)select no, fee_rate from testdb.item  where dt=20190528;
// testdb.item中数据示例如下+-------------------+----------------+|                 no|       fee_rate|+-------------------+----------------+|                  1|        0.000000||                  2|        0.000000||                  3|        0.000000|+-------------------+----------------+
3.将testdb.item中的数据保存到testdb.test_decimal中  
// tmp是上述查询testdb.item获得的临时表// 以parquet格式保存到test_decimal的20200529分区中save overwrite tmp as parquet.`/data/test_decimal/dt=20200529`; msck repair TABLE testdb.item;

上述1-3都能成功执行,数据也能保存到testdb.test_decimal中,但是当查询testdb.test_decimal中的数据时,比如执行sql:

select * from testdb.test_decimal where dt = 20200529;
会报如下空指针的异常:
Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException  at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107)  at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415)  at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414)  at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443)  at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434)  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  ...

究其原因是因为按照上述两个参数的配置,testdb.item中fee_rate字段类型为decimal(10,6),数据为0.000000,经过一系列处理0.000000最终会被处理为0,看下边最终导致空指针异常的部分,就会一目了然。

public static BigDecimal enforcePrecisionScale(BigDecimal bd, int maxPrecision, int maxScale) {        if (bd == null) {            return null;        } else {            bd = trim(bd);            if (bd.scale() > maxScale) {                bd = bd.setScale(maxScale, RoundingMode.HALF_UP);            }            // testdb.test_decimal中fee_rate的类型decimal(10,10),即precision为10,scale也为10            // 对应这里即maxPrecision和maxScale分别为10,则maxIntDigits为0            int maxIntDigits = maxPrecision - maxScale;                        // bd对应0。对于0而言,precision为1,scale为0            // 处理之后 intDigits为1            int intDigits = bd.precision() - bd.scale();            return intDigits > maxIntDigits ? null : bd;        }}

解决办法也很简单,就是将testdb.test_decimal中的fee_rate数据类型和依赖的表testdb.item中的fee_rate保持完全一致,即也为decimal(10,6)。

这个现象在实际应用环境中经常遇到,通用的解决办法就是将要保存的表中的数据类型与依赖的表(物理表或者临时表)的字段类型保持完全一致。

看完上述内容,你们掌握如何进行SparkSQL与Hive metastore Parquet转换的分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注编程之家行业资讯频道,感谢各位的阅读!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


1.SparkStreaming是什么?SparkStreaming是SparkCore的扩展API用来支持高吞吐、高容错的处理流式数据数据源可以是:Kafka、TCPsockets、Flume、Twitter等流式数据源处理数据:可以用SparkCore的算子map、reduce、join、window
本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这...
本篇文章给大家分享的是有关如何进行Spark数据分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说...
本篇内容主要讲解“Spark Shuffle和Hadoop Shuffle有哪些区别”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“S...
这篇文章主要介绍“TSDB的数据怎么利用Hadoop/spark集群做数据分析”,在日常操作中,相信很多人在TSDB的数据怎么利用Hadoop/spark集群做数据分析问题上存在疑惑...
本篇内容介绍了“Hadoop与Spark性能原理是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这
小编给大家分享一下Hadoop和Spark有什么不同,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们
这篇文章主要讲解了“Hadoop和Spark的Shuffle过程有什么不同”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习...
本篇文章给大家分享的是有关基于CDP7.1.1的Spark3.0技术预览版本分析是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获...
这篇文章主要介绍“Spark中foreachRDD、foreachPartition和foreach的区别是什么”,在日常操作中,相信很多人在Spark中foreachRDD、foreachPartition和foreach的...
本篇内容主要讲解“spark的动态分区裁剪怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spark的动态分...
本篇内容介绍了“spark的动态分区裁剪下物理计划怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下
这篇文章给大家介绍基于Spark和TensorFlow 的机器学习实践是怎么样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。EMR E-Learning平台...
这篇文章将为大家详细讲解有关如何进行EMR Spark-SQL性能极致优化的分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识...
如何进行SparkSQL与Hive metastore Parquet转换的分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决...
如何浅析Hive和Spark SQL读文件时的输入任务划分,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个...
这篇文章将为大家详细讲解有关Hive on Spark参数如何调优,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。前言Hive on Spa...
这篇文章将为大家详细讲解有关fs.defaultFS变更使spark-sql查询hive失败是怎么回事,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以...
这篇文章将为大家详细讲解有关怎么解析SparkCore和SparkSQL,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解...
怎么快速搭建Spark开发环境,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一,搭建本...