如何解决使用 Spark 将具有空值的行插入到 HBase
尝试使用 null
列值编写 Spark 数据框时,我收到关于 unsupported data type
的异常。
有一个代码示例:
val sql = spark.sqlContext
case class Person(name: String,email: String,height: Float)
var personDS = Seq(Person("alice","alice@alice.com",4.5f),Person("bob",null,5.1f) ).toDS
// ----
// ^ null value
personDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","name STRING :key,email STRING c:email,height FLOAT p:height").option("hbase.table","test").option("hbase.spark.use.hbasecontext",false).option("spark.hadoop.validateOutputSpecs",false).save()
例外是
org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:155)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: unsupported data type StringType
at org.apache.spark.sql.datasources.hbase.Utils$.toBytes(Utils.scala:87)
at org.apache.hadoop.hbase.spark.HBaseRelation$$anonfun$org$apache$hadoop$hbase$spark$HBaseRelation$$convertToPut$1$2.apply(DefaultSource.scala:225)
at org.apache.hadoop.hbase.spark.HBaseRelation$$anonfun$org$apache$hadoop$hbase$spark$HBaseRelation$$convertToPut$1$2.apply(DefaultSource.scala:224)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.hadoop.hbase.spark.HBaseRelation.org$apache$hadoop$hbase$spark$HBaseRelation$$convertToPut$1(DefaultSource.scala:224)
at org.apache.hadoop.hbase.spark.HBaseRelation$$anonfun$insert$1.apply(DefaultSource.scala:231)
at org.apache.hadoop.hbase.spark.HBaseRelation$$anonfun$insert$1.apply(DefaultSource.scala:231)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:129)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1442)
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139)
... 10 more
当然,我可以将 Person
与空值分开,如下所示:
case class Person(name: String,height: Float)
case class Person_without_email(name: String,4.5f)).toDS
var person_without_email_DS = Seq(Person(Person_without_email("bob",5.1f) )).toDS
personDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping",false).save()
person_without_email_DS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping",false).save()
这种方法看起来很丑。我应该在大量部分(〜!(字段数))上拆分原始数据帧。有没有更方便的方法来处理具有空值的行?
HBase 2.1.0-cdh6.2.1
Spark 2.4.0-cdh6.2.1
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。