如何解决展平架构
如何仅针对某些特定列而不是全部将其平铺在火花中?
def flattenDataframe(df: DataFrame): DataFrame = {
val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
val length = fields.length
for(i <- 0 to fields.length-1){
val field = fields(i)
val fieldtype = field.dataType
val fieldName = field.name
fieldtype match {
case arrayType: ArrayType =>
val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
// val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
return flattenDataframe(explodedDf)
case structType: StructType =>
val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".","_").replace("$","").replace(" ","_").replace("-","_"))))
val explodedf = df.select(renamedcols:_*)
return flattenDataframe(explodedf)
case _ =>
}
}
df
}
我正在使用上面的代码,但是它使所有列扁平化。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。