如何在Scala中使用动态键解析动态Json

如何解决如何在Scala中使用动态键解析动态Json

我正在尝试解析本质上是动态的Json结构并将其加载到数据库中。但是面临着其中json具有动态键的困难。以下是我的示例json:尝试使用爆炸功能,但没有帮助。 How to parse a dynamic JSON key in a Nested JSON result?

     {
    "_id": {
        "planId": "5f34dab0c661d8337097afb9","version": {
            "$numberLong": "1"
        },"period": {
            "name"
            : "3Q20","startDate": 20200629,"endDate": 20200927
        },"line": "b443e9c0-fafc-4791-87c9-
        8e32339c7f3c","channelId": "G7k5_-HWRIuF0-afe7q-rQ"
    },"unitRates": {
        "units": {
            "$numberLong":
            "0"
        },"rate": 0.0,"rcRate": 0.0
    },"demoValues": {
        "66": {
            "cpm": 0.0,"cpp": 0,"vpvh": 0.0,"imps"
            :
            0.0,"rcImps": 0.0,"ue": 0.0,"grps": 0.0,"demoId": "66"
        },"63": {
            "cpm": 0.0,"vpvh":
            0.0,"imps": 0.0,"demoId": "63"
        },"21": {
            "cpm": 0.0,"cpp"
            :
            0,"demoId": "21"
        }
    },"hh-imps":
    0.0
}

下面是我的scala代码:

      import org.apache.spark.sql.streaming.OutputMode
      import org.apache.spark.sql.{DataFrame,Dataset,SparkSession}
      import com.google.gson.JsonObject
      import org.apache.spark.sql.types.{ArrayType,MapType,StringType,StructField,StructType}
      import org.codehaus.jettison.json.JSONObject

object ParseDynamic_v2 {
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir","C:\\hadoop")
    val spark = SparkSession
      .builder
      .appName("ConfluentConsumer")
      .master("local[4]")
      .getOrCreate()

    import spark.implicits._
    val jsonStringDs = spark.createDataset[String](
      Seq(
        ("""{"_id" : {"planId" : "5f34dab0c661d8337097afb9","version" : {"$numberLong" : "1"},"period" : {"name" : "3Q20","startDate" : 20200629,"endDate" : 20200927},"line" : "b443e9c0-fafc-4791-87c9-8e32339c7f3c","channelId" : "G7k5_-HWRIuF0-afe7q-rQ"},"unitRates" : {"units" : {"$numberLong" : "0"},"rate" : 0.0,"rcRate" : 0.0},"demoValues" : {"66" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "66"},"63" : {"cpm" : 0.0,"demoId" : "63"},"21" : {"cpm" : 0.0,"demoId" : "21"}},"hh-imps" : 0.0}""")

      ))

    jsonStringDs.show
    import spark.implicits._
    val df = spark.read.json(jsonStringDs)
    df.show(false)


    val app = df.select("demoValues.*")
    app.createOrReplaceTempView("app")
    app.printSchema
    app.show(false)


    val verticaProperties: Map[String,String] = Map(
      "db" -> "dbname",// Database name
     "user" -> "user",// Database username
     "password" -> "****",// Password
     "table" -> "tablename",// vertica table name
     "dbschema" -> "public",// schema of vertica where the table will be 
     residing
     "host" -> "localhost",// Host on which vertica is currently running
     "hdfs_url" -> "hdfs://localhost:8020/user/hadoop/planheader/",// HDFS directory url in which intermediate orc file will persist before sending it to vertica
     "web_hdfs_url" -> "webhdfs://localhost:50070/user/hadoop/planheader/"
    )

    val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"
    //read mode
    val loadStream = df.write.format(verticaDataSource).options(verticaProperties).mode("overwrite").save()

    //read stream mode

    val saveToVertica: DataFrame => Unit =
      dataFrame =>
        dataFrame.write.format(verticaDataSource).options(verticaProperties).mode("append").save()

    val checkpointLocation = "/user/hadoop/planheader/checkpoint"
    val streamingQuery = df.writeStream
      .outputMode(OutputMode.Append)
      .option("checkpointLocation",checkpointLocation)
      //.trigger(ProcessingTime("25 seconds"))
      .foreachBatch((ds,_) => saveToVertica(ds)).start()

    streamingQuery.awaitTermination()


  }

}


    

预期输出:

enter image description here

解决方法

在这里,您看到我使用Vertica所做的事情:

我创建了一个弹性表,将其加载,并使用Vertica的弹性表功能COMPUTE_FLEXTABLE_KEYS_AND_CREATE_VIEW()来获取视图。

原来是一个单行表:

-- CREATE the Flex Table
CREATE FLEX TABLE demovals();

-- copy it using the built-in JSON Parser (it creates a map container,-- with all key-value pairs
COPY demovals FROM '/home/gessnerm/1/Vertica/supp/l.json' PARSER fjsonparser();
-- out vsql:/home/gessnerm/._vfv.sql:1: ROLLBACK 4213:  Object "demovals" already exists
-- out  Rows Loaded 
-- out -------------
-- out            1
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 112.540 ms. All rows formatted: 112.623 ms
-- the function on the next line guesses the data types in the values
-- matching the keys,stores the guessed data types in a second table,-- and builds a view from all found keys
SELECT COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW('demovals');
-- out                                  COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW                                  
-- out --------------------------------------------------------------------------------------------------------
-- out  Please see dbadmin.demovals_keys for updated keys
-- out The view dbadmin.demovals_view is ready for querying
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 467.551 ms. All rows formatted: 467.583 ms
-- now,select from the single-row view on the flex table,-- one row per column in the report (extended view: "\x" )
\x
SELECT * FROM dbadmin.demovals_view;
-- out -[ RECORD 1 ]---------------+-------------------------------------
-- out _id.channelid               | G7k5_-HWRIuF0-afe7q-rQ
-- out _id.line                    | b443e9c0-fafc-4791-87c9-8e32339c7f3c
-- out _id.period.enddate          | 20200927
-- out _id.period.name             | 3Q20
-- out _id.period.startdate        | 20200629
-- out _id.planid                  | 5f34dab0c661d8337097afb9
-- out _id.version.$numberlong     | 1
-- out demovalues.21.cpm           | 0.00
-- out demovalues.21.cpp           | 0
-- out demovalues.21.demoid        | 21
-- out demovalues.21.grps          | 0.00
-- out demovalues.21.imps          | 0.00
-- out demovalues.21.rcimps        | 0.00
-- out demovalues.21.ue            | 0.00
-- out demovalues.21.vpvh          | 0.00
-- out demovalues.63.cpm           | 0.00
-- out demovalues.63.cpp           | 0
-- out demovalues.63.demoid        | 63
-- out demovalues.63.grps          | 0.00
-- out demovalues.63.imps          | 0.00
-- out demovalues.63.rcimps        | 0.00
-- out demovalues.63.ue            | 0.00
-- out demovalues.63.vpvh          | 0.00
-- out demovalues.66.cpm           | 0.00
-- out demovalues.66.cpp           | 0
-- out demovalues.66.demoid        | 66
-- out demovalues.66.grps          | 0.00
-- out demovalues.66.imps          | 0.00
-- out demovalues.66.rcimps        | 0.00
-- out demovalues.66.ue            | 0.00
-- out demovalues.66.vpvh          | 0.00
-- out hh-imps                     | 0.00
-- out unitrates.rate              | 0.00
-- out unitrates.rcrate            | 0.00
-- out unitrates.units.$numberlong | 0

对于孩子们,例如:

CREATE FLEX TABLE children();
TRUNCATE TABLE children;
COPY children FROM '/home/gessnerm/1/Vertica/supp/l.json' PARSER fjsonparser(start_point='demoValues');
SELECT COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW('children');
\x
SELECT * FROM dbadmin.children_view;
-- out Time: First fetch (0 rows): 7.303 ms. All rows formatted: 7.308 ms
-- out  Rows Loaded 
-- out -------------
-- out            1
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 13.848 ms. All rows formatted: 13.876 ms
-- out                                  COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW                                  
-- out --------------------------------------------------------------------------------------------------------
-- out  Please see dbadmin.children_keys for updated keys
-- out The view dbadmin.children_view is ready for querying
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 140.381 ms. All rows formatted: 140.404 ms
-- out -[ RECORD 1 ]---
-- out 21.cpm    | 0.00
-- out 21.cpp    | 0
-- out 21.demoid | 21
-- out 21.grps   | 0.00
-- out 21.imps   | 0.00
-- out 21.rcimps | 0.00
-- out 21.ue     | 0.00
-- out 21.vpvh   | 0.00
-- out 63.cpm    | 0.00
-- out 63.cpp    | 0
-- out 63.demoid | 63
-- out 63.grps   | 0.00
-- out 63.imps   | 0.00
-- out 63.rcimps | 0.00
-- out 63.ue     | 0.00
-- out 63.vpvh   | 0.00
-- out 66.cpm    | 0.00
-- out 66.cpp    | 0
-- out 66.demoid | 66
-- out 66.grps   | 0.00
-- out 66.imps   | 0.00
-- out 66.rcimps | 0.00
-- out 66.ue     | 0.00
-- out 66.vpvh   | 0.00

,

不确定我的代码有多有效,但是它能完成工作。

     //reading data from json file
     val df1 = spark.read.json("src/main/resources/data.json")
     // defining schema here.
        val schema = StructType(
        StructField("planid",StringType,true) ::
        StructField("periodname",IntegerType,false) ::
        StructField("cpm",false)::
        StructField("vpvh",false)::
        StructField("imps",false)::
        StructField("demoids",false)
        :: Nil)

       var someDF = spark.createDataFrame(spark.sparkContext
       .emptyRDD[Row],schema)
        val app = df1.select("demoValues.*","_id.planId","_id.period.name")
      //this will have all the dynamic keys as column
        val arr=app.columns
        for(i <- 0 to arr.length-3) {

        println("columnname: "+arr(i))
      // traversing through keys to get the values .ex: demoValues.63.cpm
      val cpm = "demoValues."+arr(i)+".cpm"
      val vpvh = "demoValues."+arr(i)+".vpvh"
      val imps="demoValues."+arr(i)+".imps"
       val df2 = df1.select(col("_id.planId"),col("_id.period.name"),col(cpm),col(vpvh),col(imps),lit(arr(i)).as("demoids"))
        df2.show(false)
      someDF=someDF.union(df2)

      
     }
       someDF.show()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-