如何解决通过Pyspark中的转换减少多项操作/过滤器优化
F1 = df.filter((df.Identifier1>0)).groupBy().avg('Amount')
F2 = df.filter((df.Identifier1>2)).groupBy().avg('Amount')
F3 = df.filter((df.Identifier2<2)).groupBy().avg('Amount')
F4 = df.filter((df.Identifier2<4)).groupBy().avg('Amount')
#Alternatively also tried another way for avg calculation,F1 = df.filter((df.Identifier1>0)).agg(avg(col('Amount')))
..
计算这些平均值后,我试图使用平均值计算中使用的相同过滤器将它们分配给主df中的记录,分为两列A1和A2 。
df = df.withColumn("A1",when((col("Identifier1") > 0)),(F1.collect()[0][0]))
….
….
.otherwise(avg(col('Amount')))
df = df.withColumn("A2",when((col("Identifier2") <2 )),(F3.collect()[0][0]))
….
….
.otherwise(avg(col('Amount')))
我面临两个问题:
-
当平均值之一为Null时,我在调用
时出错collect() or first()
错误:
Unsupported literal type class java.util.ArrayList [null]
-
由于涉及多个动作,该过程需要2个小时以上的时间。
欢迎您提供上述帮助。
解决方法
为您的过滤条件创建一列,例如
+---+--------+-----------+-----------+------+
| ID|Category|Identifier1|Identifier2|Amount|
+---+--------+-----------+-----------+------+
| 12| A| 2| 1| 100|
| 23| B| 7| 8| 500|
| 34| C| 1| 4| 300|
+---+--------+-----------+-----------+------+
df.withColumn('group',when(df.Identifier1 > 0,array(lit(1))).otherwise(array(lit(None)))) \
.withColumn('group',when(df.Identifier1 > 2,array_union(col('group'),array(lit(2)))).otherwise(col('group'))) \
.withColumn('group',when(df.Identifier2 < 2,array(lit(3)))).otherwise(col('group'))) \
.withColumn('group',when(df.Identifier2 < 4,array(lit(4)))).otherwise(col('group'))) \
.withColumn('group',explode('group')) \
.groupBy('group').agg(sum('Amount').alias('sum'),avg('Amount').alias('avg')).show()
+-----+---+-----+
|group|sum| avg|
+-----+---+-----+
| 1|900|300.0|
| 3|100|100.0|
| 4|100|100.0|
| 2|500|500.0|
+-----+---+-----+
,然后按分组。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。