如何解决动态生成具有过滤器的代码,具有ColumnRenamed和合并条件Scala Spark
我有一段代码要动态生成。我想以列表或序列的形式在下面的列中使用filter
,coalesce
和drop
语句执行withColumnRenamed
操作。
这是我要动态接受的列的列表(此处为字符串)。
val cols = "a|tmp_a,b|tmp_b"
代码看起来像这样:
val df1 = df2.filter(!(coalesce(col("a"),lit(0)) === coalesce(col("tmp_a"),lit(0))) || !(upper(col("b")) === upper(col("tmp_b"))))
.drop("a")
.drop("b")
.withColumnRenamed("tmp_a","a")
.withColumnRenamed("tmp_b","b")
如果将更多列添加到cols
,该代码如何动态调整?新的列对应使用与上述“ b | tmp_b”相同的过滤条件。
解决方法
给输入一个带有成对列名称的输入,您可以创建两种类型的过滤条件(在第一个列对下面使用第一个过滤器模式,其余使用第二个过滤器模式)。过滤数据框后,可以使用drop
来应用withColumnRenamed
和foldLeft
。
val cols = "a|tmp_a,b|tmp_b,c|tmp_c".split(",").map(_.split("\\|"))
val filterCondHead = !(coalesce(col(cols.head(0)),lit(0)) === coalesce(col(cols.head(1)),lit(0)))
val filterCondTail = cols.tail.map(c => !(upper(col(c(0))) === upper(col(c(1))))).reduce(_ || _)
val df2 = df.filter(filterCondHead || filterCondTail)
val df3 = cols.foldLeft(df2){ case(df,c) =>
df.drop(c(0)).withColumnRenamed(c(1),c(0))
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。