如何解决使用Spark Streaming解析来自Kafka的Json数据,解析并存储到Kudu,java.lang.NullPointerException
Kafka Json数据:不规则的JSON exp:{"request_body":[{"a":"a_value","b","b_value" ...}]}
自动实现将JSON的关键字作为表字段,然后自动添加与字段相关的功能,目前没有问题。只有将已处理的JSON字符串存储到kudu的事件表中的功能才会出现此问题
object KafkaToKudu {
val kuduMasters: String = System.getProperty("kuduMasters","*.*.*.15:7051,*.*.*.16:7051,*.*.*.17:7051") //tDataAnalysis01:7051,tDataAnalysis02:7051,tDataAnalysis03:7051")
val tableNumReplicas: Int = Integer.getInteger("tableNumReplicas",1)
var Topic = "MiTest"
val spark = SparkSession.builder.appName("KuduSparkc").master("local[*]").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(5))
val Kuduclient = new KuduClient.KuduClientBuilder(kuduMasters).build()
val logger = LoggerFactory.getLogger(KafkaToKudu.getClass)
val kuduContextc = new KuduContext(kuduMasters,spark.sqlContext.sparkContext)
import spark.implicits._
def getNewJsonData(data:Map[String,Any]): String = {
// proccessign data to datas (add Key "Id" )
return datas
}
def createId(orid: String,dId: String): String = {
return UUID.randomUUID().toString
}
import java.time.{Instant,ZoneId,ZonedDateTime}
def main(args: Array[String]): Unit = {
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> "*.*.*.15:9092,*.*.*.16:9092,*.*.*.17:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "separate_id_for_each_stream","auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array(Topic)
val kafkaStream = KafkaUtils.createDirectStream(
ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams)
)
kafkaStream.foreachRDD(rdd => {
rdd.foreachPartition(partitions => {
implicit val formats = Serialization.formats(NoTypeHints)
partitions.foreach(row => {
val rows = row.value()
val ab = parse(rows).extract[Map[String,Any]].get("request_body")
try {
val ap = parse(ab.get.toString).extract[List[Map[String,Any]]]
for (acc <- ap) {
var datas = acc
val yps = datas.get("type")
val tp = yps.get.asInstanceOf[String]
tp match {
case "track" => {
val newdata:String=getNewJsonData(datas)
try{
val sp = SparkSession.builder.config(spark.sparkContext.getConf).getOrCreate().sqlContext
import sp.implicits._
val dd=sp.createDataset(Seq(newdata)) ///===**java.lang.NullPointerException**
val df= sp.read.json(dd)
kuduContextc.insertRows(df,"events")
}catch {
case e: Exception => {
println(e)
}}
}
case _ =>{
println(tp)
}
}
}
} catch {
case e: Exception => {
println(e)
}
}
})
})
})
ssc.start()
ssc.awaitTermination()
}
}
代码运行到val dd=sp.createDataset(Seq(newdata))
我收到了 java.lang.NullPointerException
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。