从列表字典中更有效地循环,合并和合并聚合

如何解决从列表字典中更有效地循环,合并和合并聚合

我正在使用包含不完整字段和return-type(*variable-name)(argument-types...)的数据框(df),为此我对这些字段的各种组合进行平均以尝试返回我能得到的最准确的平均值,给出了可用的信息。

“各种组合”包含在列表字典中。在map_df中使用每个列表来查找可用信息的平均值,然后再使用同一列表将其连接回原始.groupBy

现在,我已经针对预期的行为提供了工作代码,但是它依赖于嵌套的df循环,并且在大型数据集上的性能非常差(在具有1亿行的数据帧上超过1小时) 。是否有更有效的方法来使用地图或使用for进行某种构造来执行循环,联接和嵌套循环?

示例pandasdfmap_df

group_dict

以下代码是可操作的,但效率不高:

df = spark.createDataFrame(
  [(1,12,'East','Q1'),(2,14,(3,'West','Q2'),(4,13,(5,(6,None,None),(7,(8,None)],['id','product','location','quarter'])

map_df = spark.createDataFrame(
  [(12,'Q1',10,15),(13,5,10),(14,20,20),7,8),12),(12,'Q2',30,5)],['product','quarter','cost','quantity'])

group_dict = {
  1: ['product','quarter'],2: ['product','location'],3: ['product',4: ['product']}

虽然很慢,但我得到了预期的结果:

facts = ['cost','quantity']
h = 'hash'
field_list = ['product','quarter']

origin_df = df.withColumn(h,hash(concat_ws('|',*df.columns))) # Creates a unique ID to be used in the final join,so unnecessary columns aren't pulled into each step of the loop
loop_df   = origin_df.select(h,*field_list)

## Loops and groups each combination of fields in each `combo`,while aggregating each `fact`
for k,c in group_dict.items():
  exp     = [avg(f).alias(f"{f}_coalesce") for f in facts]
  join_df = map_df # The function will repeatedly use `map_df` as the source for the fact being estimated,rather than an existing field in the original `df` in its current `looped` iteration

  loop_df = loop_df.join(join_df.groupBy(c).agg(*exp),c,'left')
  for f in facts:
    f_co    = f"{f}_coalesce"
    if not f in loop_df.columns: # Creates a blank version of the field `f` as the first pass,so it can be coalesced with `when` in subsequent loops
      loop_df = loop_df.withColumn(f,col(f_co)).drop(f_co)
    else: 
      loop_df = loop_df.withColumn(f,when(col(f).isNull(),col(f_co)).otherwise(col(f))).drop(f_co) # If `f` is blank,use the value from the `.groupBy` instead

return_df = loop_df.select(h,*facts).join(origin_df.drop(*facts),h).drop(h)

在哪里可以改善我的方法?感谢您的宝贵时间!

解决方法

这可能是一个有用的部分步骤。首先,我重新创建了您的示例数据:

import pandas as pd

df = pd.DataFrame([
    (1,12,'East','Q1'),(2,14,(3,'West','Q2'),(4,13,(5,(6,None,None),(7,(8,None)],columns = ['id','product','location','quarter'])

map_df = pd.DataFrame([
    (12,'Q1',10,15),(13,5,10),(14,20,20),7,8),12),(12,'Q2',30,5)],columns = ['product','quarter','cost','quantity'])

group_dict = {
  1: ['product','quarter'],2: ['product','location'],3: ['product',4: ['product']}

现在,您似乎尝试了merge()字典所定义的多种group_dict策略。您可以在单个循环中执行这些合并,并获得4个结果数据帧。

ts = list()
for rule,fields in group_dict.items():
    t = pd.merge(left = df,right = map_df,how = 'outer',on = fields)
    t['rule'] = rule
    t['fields'] = str(fields)
    ts.append(t)

更新

以下步骤显示了如何合并上面收集的数据帧。

# concatenate data frames,and drop rows with NaN values
new_df = (pd.concat(ts)
          .loc[:,['id','quantity','rule']]
          .loc[ lambda x: x['id'].notna() ]
          .loc[ lambda x: x['cost'].notna() ]
          .loc[ lambda x: x['quantity'].notna() ]
         )

# find the first rule that has id,cost and quantity
new_df['1st_match'] = new_df.groupby('id')['rule'].transform(min)

# keep first (i.e.,best) match
new_df = new_df[ new_df['rule'] == new_df['1st_match'] ]

# drop helper columns
new_df = new_df.drop(columns=['rule','1st_match'])

# fields to keep
fields = ['id','quarter']

# use average cost,average quantity (some rows have more that one match)
new_df = new_df.groupby(fields,as_index=False,dropna=False)[
         ['cost','quantity']].mean()

print(new_df)

    id  product location quarter  cost  quantity
0  1.0       12     East      Q1  10.0      15.0
1  2.0       14     East      Q1  20.0      20.0
2  3.0       12     West      Q2  30.0       5.0
3  4.0       13     West      Q2   7.0       8.0
4  5.0       13     East      Q1   5.0      10.0
5  6.0       12      NaN     NaN  20.0      10.0
6  7.0       13     West     NaN   7.0       8.0
7  8.0       12     West     NaN  20.0      10.0

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-