如何解决使用Spark的HIVE中的数据在SQL SERVER数据库中的UPDATE表
我在SQL Server中有我的主表,我想根据我的主表(在SQL Server DB中)和目标表(在HIVE)中3列匹配的条件来更新表中的几列。两个表都有多列,但我只对6列感兴趣,如下所示:
我要在主表中更新的3列是
"INSPECTED_BY","INSPECTION_COMMENTS" and "SIGNED_BY"
我要用作匹配条件的列是
"SERVICE_NUMBER","PART_ID" and "LOTID"
我尝试了以下代码,但它给了我NullPointerException错误
val df = spark.table("location_of_my_table_in_hive")
df.show(false)
df.foreachPartition(partition =>
{
val Connection = DriverManager.getConnection(SQLjdbcURL,SQLusername,SQLPassword)
val batch_size = 100
var psmt: PreparedStatement = null
partition.grouped(batch_size).foreach(batch =>
{
batch.foreach{row =>
{
val inspctbyIndex = row.fieldIndex("INSPECTED_BY")
val inspctby = row.getString(inspctbyIndex)
val inspcomIndex = row.fieldIndex("INSPECT_COMMENTS")
val inspcom = row.getString(inspcomIndex)
val signIndex = row.fieldIndex("SIGNED_BY")
val signby = row.getString(signIndex)
val sqlquery = "MERGE INTO SERVICE_LOG_TABLE as LOG" +
"USING (VALUES(?,?,?))" +
"AS ROW(inspctby,inspcom,signby)" +
"ON LOG.SERVICE_NUMBER = ROW.SERVICE_NUMBER and LOG.PART_ID = ROW.PART_ID and LOG.LOTID = ROW.LOTID" +
"WHEN MATCHED THEN UPDATE SET INSPECTED_BY = 'SMITH',INSPECT_COMMENTS = 'STANDARD_MET',SIGNED_BY = 'WILL'" +
"WHEN NOT MATCHED THEN INSERT VALUES(ROW.INSPECTED_BY,ROW.INSPECT_COMMENTS,ROW.SIGNED_BY)"
var psmt: PreparedStatement = Connection.prepareStatement(sqlquery)
psmt.setString(1,inspctby)
psmt.setString(2,inspcom)
psmt.setString(3,signby)
psmt.addBatch()
}
}
psmt.executeBatch()
Connection.commit()
psmt.close()
})
Connection.close()
})
这是错误
ERROR scheduler.TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4
times,most recent failure: Lost task 0.3 in stage 2.0 (TID 9,lwtxa0gzpappr.corp.bankofamerica.com,executor 4): java.lang.NullPointerException
at $anonfun$1$$anonfun$apply$1.apply(/location/service_log.scala:101)
at $anonfun$1$$anonfun$apply$1.apply(/location/service_log.scala:74)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at $anonfun$1.apply(/location/service_log.scala:74)
at $anonfun$1.apply(/location/service_log.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我搜索了Internet,找不到错误即将来临的原因。 任何帮助将不胜感激
解决方法
如果您在Spark群集上运行此程序,我认为您可能必须广播一些对象。执行程序无法获取对象的值,因此空指针异常。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。