使用Scala展平JSON

如何解决使用Scala展平JSON

我有一个包含以下数据的json文件

{
    "@odata.context": "XXXX","value": [
        {
            "@odata.etag": "W/\"JzQ0OzlxaDNzLys1WXBPbWFXaE5MbFdKbVpNYjMrWDQ1MmJSeGdxVVhrTVRZUXc9MTswMDsn\"","E_No": 345345,"G_Code": "007","G_2_Code": ""
        },{
            "@odata.etag": "W/\"JzQ0O0ZNWkF2OGd1dVE2L21OQTdKR2g4YU05TldKMERpMUpMWTRSazFKQzZuTDQ9MTswMDsn\"","E_No": 234543,"G_Code": "008","G_2_Code": ""
        }
    ],"@odata.nextLink": "XXXX"
}

我正在尝试使用Scala在Databricks中对此进行展平。我创建了一个数据框DF

val DF= spark.read.json(path)

我想将其作为json提交,并且我需要一个仅使用E_No,G_Code和G_2_Code创建的数据框。其余的列可以从数据框中删除

我试图将此json放到我在一个博客中找到的拼合代码中

def flattenDataframe(df: DataFrame): DataFrame = {

    val fields = df.schema.fields
    val fieldNames = fields.map(x => x.name)
    val length = fields.length
    
    for(i <- 0 to fields.length-1){
      val field = fields(i)
      val fieldtype = field.dataType
      val fieldName = field.name
      fieldtype match {
        case arrayType: ArrayType =>
          val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
          val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
          val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
          return flattenDataframe(explodedDf)
        case structType: StructType =>
          val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
          val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
          val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".","_"))))
         val explodedf = df.select(renamedcols:_*)
          return flattenDataframe(explodedf)
        case _ =>
      }
    }
    df
  }

当我运行以下命令时,出现错误

val flattendedJSON = flattenDataframe(DF)

extraneous input '@' expecting {'(','COLLECT','CONVERT','DELTA','HISTORY','MATCHED','MERGE','OPTIMIZE','SAMPLE','TIMESTAMP','UPDATE','VERSION','ZORDER','ADD','AFTER','ALL','ALTER','ANALYZE','AND','ANTI','ANY','ARCHIVE','ARRAY','AS','ASC','AT','AUTHORIZATION','BETWEEN','BOTH','BUCKET','BUCKETS','BY','CACHE','CASCADE','CASE','CAST','CHANGE','CHECK','CLEAR','CLONE','CLUSTER','CLUSTERED','CODEGEN','COLLATE','COLLECTION','COLUMN','COLUMNS','COMMENT','COMMIT','COMPACT','COMPACTIONS','COMPUTE','CONCATENATE','CONSTRAINT','COPY','COPY_OPTIONS','COST','CREATE','CREDENTIALS','CROSS','CUBE','CURRENT','CURRENT_DATE','CURRENT_TIME','CURRENT_TIMESTAMP','CURRENT_USER','DATA','DATABASE',DATABASES,'DAY','DBPROPERTIES','DEEP','DEFINED','DELETE','DELIMITED','DESC','DESCRIBE','DFS','DIRECTORIES','DIRECTORY','DISTINCT','DISTRIBUTE','DROP','ELSE','ENCRYPTION','END','ESCAPE','ESCAPED','EXCEPT','EXCHANGE','EXISTS','EXPLAIN','EXPORT','EXTENDED','EXTERNAL','EXTRACT','FALSE','FETCH','FIELDS','FILTER','FILEFORMAT','FILES','FIRST','FOLLOWING','FOR','FOREIGN','FORMAT','FORMAT_OPTIONS','FORMATTED','FROM','FULL','FUNCTION','FUNCTIONS','GLOBAL','GRANT','GROUP','GROUPING','HAVING','HOUR','IF','IGNORE','IMPORT','IN','INDEX','INDEXES','INNER','INPATH','INPUTFORMAT','INSERT','INTERSECT','INTERVAL','INTO','IS','ITEMS','JOIN','KEYS','LAST','LATERAL','LAZY','LEADING','LEFT','LIKE','LIMIT','LINES','LIST','LOAD','LOCAL','LOCATION','LOCK','LOCKS','LOGICAL','MACRO','MAP','MINUTE','MONTH','MSCK','NAMESPACE','NAMESPACES','NATURAL','NO',NOT,'NULL','NULLS','OF','ON','ONLY','OPTION','OPTIONS','OR','ORDER','OUT','OUTER','OUTPUTFORMAT','OVER','OVERLAPS','OVERLAY','OVERWRITE','PARTITION','PARTITIONED','PARTITIONS','PATTERN','PERCENT','PIVOT','PLACING','POSITION','PRECEDING','PRIMARY','PRINCIPALS','PROPERTIES','PURGE','QUERY','RANGE','RECORDREADER','RECORDWRITER','RECOVER','REDUCE','REFERENCES','REFRESH','RENAME','REPAIR','REPLACE','RESET','RESTRICT','REVOKE','RIGHT',RLIKE,'ROLE','ROLES','ROLLBACK','ROLLUP','ROW','ROWS','SCHEMA','SECOND','SELECT','SEMI','SEPARATED','SERDE','SERDEPROPERTIES','SESSION_USER','SET','MINUS','SETS','SHALLOW','SHOW','SKEWED','SOME','SORT','SORTED','START','STATISTICS','STORED','STRATIFY','STRUCT','SUBSTR','SUBSTRING','TABLE','TABLES','TABLESAMPLE','TBLPROPERTIES',TEMPORARY,'TERMINATED','THEN','TO','TOUCH','TRAILING','TRANSACTION','TRANSACTIONS','TRANSFORM','TRIM','TRUE','TRUNCATE','TYPE','UNARCHIVE','UNBOUNDED','UNCACHE','UNION','UNIQUE','UNKNOWN','UNLOCK','UNSET','USE','USER','USING','VALUES','VIEW','VIEWS','WHEN','WHERE','WINDOW','WITH','YEAR','+','-','*','DIV','~',STRING,BIGINT_LITERAL,SMALLINT_LITERAL,TINYINT_LITERAL,INTEGER_VALUE,EXPONENT_VALUE,DECIMAL_VALUE,DOUBLE_LITERAL,BIGDECIMAL_LITERAL,IDENTIFIER,BACKQUOTED_IDENTIFIER}(line 1,pos 0)

== SQL ==
@odata.context
^^^

我猜想它不喜欢我不需要的'@odata'列。我需要删除该专栏,然后查看这种展平是否有效。

如果除了我正在使用的拼合代码之外,还有其他比拼合更好的方法,请告诉我。

谢谢

解决方法

展开嵌套数组JSON,然后选择所需的字段,然后以JSON格式写入文件。

val jsonDF= spark.read.json(path)

val explodeColName = "value" // name of the column we want to explode
val flattenColName = explodeColName + "_flat" // temp name

val listOfColsFromArrayType =
  jsonDF.schema
    .find(
      s => s.name == explodeColName && s.dataType.isInstanceOf[ArrayType])
    .map(
      _.dataType
        .asInstanceOf[ArrayType]
        .elementType
        .asInstanceOf[StructType]
        .names
    )

val filterColList =
  listOfColsFromArrayType.getOrElse(throw new Exception("explode Col Name not found")) // or handle the error as needed

val flattenFilterCols = filterColList.map { c =>
  if (c.contains(".")) { col(s"$flattenColName.`$c`") } else {
    col(s"$flattenColName.$c")
  }
}

val flatten = jsonDF
  .select(explode(col(explodeColName)).as(flattenColName))
  .select(flattenFilterCols: _*)
   
    flattenDF
      .write
      .json(outputPath)

结果将是

{"@odata.etag":"W/\"JzQ0OzlxaDNzLys1WXBPbWFXaE5MbFdKbVpNYjMrWDQ1MmJSeGdxVVhrTVRZUXc9MTswMDsn\"","E_No":345345,"G_2_Code":"","G_Code":"007"}
{"@odata.etag":"W/\"JzQ0O0ZNWkF2OGd1dVE2L21OQTdKR2g4YU05TldKMERpMUpMWTRSazFKQzZuTDQ9MTswMDsn\"","E_No":234543,"G_Code":"008"}
,

我对您的方法进行了很少的更改,现在可以使用了。

请注意,我尚未重命名任何基础列。如果您想在进一步处理中获取它,请使用backtique(`)

测试数据

import 'dart:convert';

import 'package:flutter/material.dart';

LocationModel locationModelFromJson(String str) =>
    LocationModel.fromJson(json.decode(str));

class LocationModel {
  List<LocationList> data;

  LocationModel({this.data});

  LocationModel.fromJson(Map<String,dynamic> json) {
    if (json['data'] != null) {
      data = List<LocationList>();
      json['data'].forEach((v) {
        data.add(LocationList.fromJson(v));
      });
    }
  }

  Map<String,dynamic> toJson() {
    final Map<String,dynamic> data = Map<String,dynamic>();
    if (this.data != null) {
      data['data'] = this.data.map((v) => v.toJson()).toList();
    }
    return data;
  }
}

class LocationList {
  String state;
  List<Locations> locations;

  LocationList({this.state,this.locations});

  LocationList.fromJson(Map<String,dynamic> json) {
    state = json['state'];
    if (json['locations'] != null) {
      locations = List<Locations>();
      json['locations'].forEach((v) {
        locations.add(Locations.fromJson(v));
      });
    }
  }

  Map<String,dynamic>();
    data['state'] = this.state;
    if (this.locations != null) {
      data['locations'] = this.locations.map((v) => v.toJson()).toList();
    }
    return data;
  }
}

class Locations {
  int id;
  String name;
  String city;
  String state;
  String timezone;
  String ownerName;
  String ownerPhoneNumber;
  String ownerEmail;
  String ownerWebsite;
  int capacity;
  String description;
  String createdBy;
  String modifiedBy;
  String createdAt;
  String modifiedAt;

  Locations(
      {this.id,this.name,this.city,this.state,this.timezone,this.ownerName,this.ownerPhoneNumber,this.ownerEmail,this.ownerWebsite,this.capacity,this.description,this.createdBy,this.modifiedBy,this.createdAt,this.modifiedAt});

  Locations.fromJson(Map<String,dynamic> json) {
    id = json['id'];
    name = json['name'];
    city = json['city'];
    state = json['state'];
    timezone = json['timezone'];
    ownerName = json['ownerName'];
    ownerPhoneNumber = json['ownerPhoneNumber'];
    ownerEmail = json['ownerEmail'];
    ownerWebsite = json['ownerWebsite'];
    capacity = json['capacity'];
    description = json['description'];
    createdBy = json['createdBy'];
    modifiedBy = json['modifiedBy'];
    createdAt = json['createdAt'];
    modifiedAt = json['modifiedAt'];
  }

  Map<String,dynamic>();
    data['id'] = this.id;
    data['name'] = this.name;
    data['city'] = this.city;
    data['state'] = this.state;
    data['timezone'] = this.timezone;
    data['ownerName'] = this.ownerName;
    data['ownerPhoneNumber'] = this.ownerPhoneNumber;
    data['ownerEmail'] = this.ownerEmail;
    data['ownerWebsite'] = this.ownerWebsite;
    data['capacity'] = this.capacity;
    data['description'] = this.description;
    data['createdBy'] = this.createdBy;
    data['modifiedBy'] = this.modifiedBy;
    data['createdAt'] = this.createdAt;
    data['modifiedAt'] = this.modifiedAt;
    return data;
  }
}

class LocationUpdate extends StatefulWidget {
  @override
  _LocationUpdateState createState() => _LocationUpdateState();
}

class _LocationUpdateState extends State<LocationUpdate> {
  LocationModel _locationModel;
  bool isLoading = true;
  String _selectedState = "TEXAS";
  List<String> _statesList = [];

  Locations _selectedLocation;
  List<Locations> _locationsList = List();

  @override
  void initState() {
    // TODO: implement initState
    super.initState();
    getList();
  }

  Future<void> getList() async {
    try {
      /*_locationModel = await Provider.of<UserDetails>(context,listen: false)
          .getLocationList("token");*/
      String jsonString = '''
     {
    "data": [
        {
            "state": "TEXAS","locations": [
                {
                    "id": 1,"name": "FITT Sugarland","city": "HOUSTON","state": "TEXAS","timezone": "","ownerName": "","ownerPhoneNumber": "","ownerEmail": "","ownerWebsite": "","capacity": 0,"description": "HOUSTON SUGARLAND","createdBy": "","modifiedBy": "","createdAt": "2020-08-18T10:17:55.000Z","modifiedAt": "2020-08-18T10:17:55.000Z"
                },{
                    "id": 2,"name": "FITT Pearland","description": "second location","createdAt": "2020-08-18T10:18:38.000Z","modifiedAt": "2020-08-18T10:18:38.000Z"
                }
            ]
        },{
            "state": "A","name": "A1","city": "A City 1","state": "A","name": "A2","city": "A city 2","modifiedAt": "2020-08-18T10:18:38.000Z"
                }
            ]
        }
    ]
}
      ''';
      _locationModel = locationModelFromJson(jsonString);
      for (int i = 0; i < _locationModel.data.length; i++) {
        _statesList.add(_locationModel.data[i].state);
      }
      _locationsList = _locationModel.data[0].locations;
      _selectedState = _statesList[0];
      _selectedLocation = _locationsList[0];
      if (mounted) {
        setState(() {
          isLoading = false;
        });
      }
      //} on HttpException catch (error) {
      // CustomWidgets.buildErrorSnackbar(context);
    } catch (error) {
      //CustomWidgets.buildErrorSnackbar(context);
    }
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: Container(
        //color: AppConfig.bgColor,child: SafeArea(
          child: Column(
            children: <Widget>[
              Container(
                padding: EdgeInsets.fromLTRB(20.0,10.0,20.0,10.0),height: 40.0,child: Row(
                  mainAxisAlignment: MainAxisAlignment.spaceBetween,crossAxisAlignment: CrossAxisAlignment.center,children: <Widget>[
                    GestureDetector(
                      behavior: HitTestBehavior.translucent,onTap: () {
                        Navigator.pop(context,true);
                      },child: Container(
                          height: 25.0,width: 25.0,child: Image.network(
                              'https://picsum.photos/250?image=9',color: Colors.white)),),Text('LOCATION',style: TextStyle(
                          fontSize: 18.0,color: Color(0xffFFFFFF),)),SizedBox(
                      width: 25.0,],Expanded(
                child: Container(
                  color: Colors.white,height: MediaQuery.of(context).size.height,width: MediaQuery.of(context).size.width,padding: EdgeInsets.all(30.0),child: isLoading
                      ? Center(
                          child: CircularProgressIndicator(),)
                      : Column(
                          mainAxisAlignment: MainAxisAlignment.start,crossAxisAlignment: CrossAxisAlignment.start,children: [
                            SizedBox(
                              height: 20.0,Padding(
                              padding: const EdgeInsets.only(left: 5.0),child: Text('UPDATE YOUR LOCATION',style: TextStyle(
                                    fontSize: 16.0,color: Color(0xff000000),SizedBox(
                              height: 20.0,Container(
                              height: 40.0,decoration: BoxDecoration(
                                border: Border.all(
                                    color: Colors.black54,width: 0.0),borderRadius: BorderRadius.circular(10.0),child: DropdownButtonHideUnderline(
                                  child: ButtonTheme(
                                      alignedDropdown: true,child: DropdownButton(
                                        value: _selectedState,hint: Text("State"),isExpanded: true,items: _statesList
                                            .map((String item) =>
                                                DropdownMenuItem<String>(
                                                    child: Text(item),value: item))
                                            .toList(),onChanged: (String Value) {
                                          if (mounted)
                                            setState(() {
                                              _selectedState = Value;

                                              int index = _locationModel.data
                                                  .indexWhere((element) =>
                                                      element.state ==
                                                      _selectedState);

                                              _locationsList = _locationModel
                                                  .data[index].locations;
                                              _selectedLocation =
                                                  _locationsList[0];
                                            });
                                        },style: Theme.of(context)
                                            .textTheme
                                            .bodyText2,))),child: DropdownButton(
                                        value: _selectedLocation,hint: Text("Location"),items: _locationsList.map((item) {
                                          return DropdownMenuItem(
                                            child: Text(item.name),value: item,);
                                        }).toList(),onChanged: (Value) {
                                          if (mounted) print(Value);
                                          setState(() {
                                            _selectedLocation = Value;
                                            print(
                                                "${_selectedLocation.name} ${_selectedLocation.id} ${_selectedLocation.city}");
                                          });
                                        },)
                          ],)
            ],);
  }
}

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter Demo',theme: ThemeData(
        primarySwatch: Colors.blue,visualDensity: VisualDensity.adaptivePlatformDensity,home: LocationUpdate(),);
  }
}

class MyHomePage extends StatefulWidget {
  MyHomePage({Key key,this.title}) : super(key: key);

  final String title;

  @override
  _MyHomePageState createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
  int _counter = 0;

  void _incrementCounter() {
    setState(() {
      _counter++;
    });
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,children: <Widget>[
            Text(
              'You have pushed the button this many times:',Text(
              '$_counter',style: Theme.of(context).textTheme.headline4,floatingActionButton: FloatingActionButton(
        onPressed: _incrementCounter,tooltip: 'Increment',child: Icon(Icons.add),);
  }
}

展平array和struct类型的嵌套列

{ "_embedded":{
      "events":[
         {
            "name":"Hamilton","type":"event","id":"Z7r9jZ1Ae0EP8","test":false,"url":"http://www.ticketsnow.com/InventoryBrowse/TicketList.aspx?PID=2927950","_embedded":{
               "venues":[
                  "0":                  {
                     "name":"Reynolds Hall","type":"venue","id":"Z7r9jZadyb","locale":"en-us","location":{
                        "longitude":"-115.162598","latitude":"36.182201"
                     }
                  }
               ]
            }
         }
      ]
   }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-