如何解决在PySpark中反向分组
我不确定问题本身是否正确。我为SQL找到的解决方案不适用于Hive SQL或禁止递归。 因此,我想在Pyspark中解决问题,并且需要解决方案,或者至少是解决问题的想法。
我有一个原始表,如下所示:
+--------+----------+
|customer|nr_tickets|
+--------+----------+
| A| 3|
| B| 1|
| C| 2|
+--------+----------+
这是我想要桌子的方式:
+--------+
|customer|
+--------+
| A|
| A|
| A|
| B|
| C|
| C|
+--------+
您有什么建议吗?
非常感谢您!
解决方法
对于 Spark2.4+
,将 array_repeat
与 explode
一起使用。
from pyspark.sql import functions as F
df.selectExpr("""explode(array_repeat(customer,cast(nr_tickets as int))) as customer""").show()
#+--------+
#|customer|
#+--------+
#| A|
#| A|
#| A|
#| B|
#| C|
#| C|
#+--------+
,
您可以通过遍历行(组)来创建新的数据框。
使用customer
为该客户重复Row(customer=a["customer"])
次nr_tickets
(range(int(a["nr_tickets"]))
)的行的第一个制作清单
df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]
您可以将它们存储并追加到列表中,以后再用它创建一个数据框。
df= spark.createDataFrame(df_list)
总体
from pyspark.sql import Row
df_list = []
for a in df.select(["customer","nr_tickets"]).collect():
df_list = df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]
df= spark.createDataFrame(df_list)
df.show()
您也可以使用列表理解为
from pyspark.sql import Row
from functools import reduce #python 3
df_list = [
[Row(customer=a["customer"])]*int(a["nr_tickets"])
for a in df.select(["customer","nr_tickets"]).collect()
]
df= spark.createDataFrame(reduce(lambda x,y: x+y,df_list))
df.show()
生产
+--------+
|customer|
+--------+
| A|
| A|
| A|
| B|
| C|
| C|
+--------+
,
同时,我自己也找到了解决方案:
for i in range(1,max_nr_of_tickets):
table = table.filter(F.col('nr_tickets') >= 1).union(test)
table = table.withColumn('nr_tickets',F.col('nr_tickets') - 1)
说明:DF的“表”和“测试”开头相同。 因此,“ max_nr_of_tickets”只是最高的“ nr_tickets”。有用。 我只是在为最大数字格式苦苦挣扎:
max_nr_of_tickets = df.select(F.max('nr_tickets')).collect()
我不能在for循环的范围内使用结果,因为它是一个列表。所以我手动输入最高的数字。 有什么想法可以将max_nr_of_tickets设置为正确的格式,以便循环范围可以接受吗?
谢谢
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。