在每次迭代期间通过更新并行化循环

如何解决在每次迭代期间通过更新并行化循环

我有一些R代码,它将美国人口普查中来自美国所有州的人口统计数据汇总到一个列表对象中。由于有大约1100万个块,因此块级代码可能需要一周的时间才能作为顺序循环运行,因此,我尝试对状态进行并行化以使其更快。我已经达到了这个目标:

states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA","WA","WV","WI","WY","DC","PR")
library(future.apply)
plan(multiprocess)
ptm <- proc.time()
CensusObj_block_age_sex = list()

CensusObj_block_age_sex[states] <- future_lapply(states,function(s){
  county <- census_geo_api(key = "XXX",state = s,geo = "county",age = TRUE,sex = TRUE)
  tract  <- census_geo_api(key = "XXX",geo = "tract",sex = TRUE)
  block  <- census_geo_api(key = "XXX",geo = "block",sex = TRUE)
  censusObj[[s]] <- list(state = s,sex = TRUE,block = block,tract = tract,county = county)
}
)

但是,我需要使其更强大。有时Census API会出现问题,因此我希望CensusObj在每次状态迭代时都进行更新,以便在出现问题时不会丢失已完成的数据。这样,如果发生错误(例如,我将“ WY”拼写为“ WU”),我可以重新启动剩余状态的循环

是否有可能以某种方式实现这一目标?我愿意接受其他并行化方法。


上面的代码可以运行,但似乎会遇到内存问题:

Error: Failed to retrieve the value of MultisessionFuture (future_lapply-3) from cluster RichSOCKnode #3 (PID 80363 on localhost ‘localhost’). The reason reported was ‘vector memory exhausted (limit reached?)’. Post-mortem diagnostic: A process with this PID exists,which suggests that the localhost worker is still alive.

我的.Renviron中有R_MAX_VSIZE = 8Gb,但是我不确定如何在机器的8个内核之间进行划分。所有这些都表明我需要存储每次迭代的结果,而不是尝试将其全部保存在内存中,然后在最后将对象附加在一起。

解决方法

这里是一种使用doParallel(具有UNIX系统的选项,但您也可以在Windows上使用,请参见here)和foreach的解决方案,该解决方案存储了每个状态分开,然后读入单个文件并将它们组合到一个列表中。

library(doParallel)
library(foreach)

path_results <- "my_path"
ncpus = 8L
registerDoParallel(cores = ncpus)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA","WA","WV","WI","WY","DC","PR")
results <- foreach(state = states) %dopar% {
                     county <- census_geo_api(key = "XXX",state = state,geo = "county",age = TRUE,sex = TRUE)
                     tract  <- census_geo_api(key = "XXX",geo = "tract",sex = TRUE)
                     block  <- census_geo_api(key = "XXX",geo = "block",sex = TRUE)
                     results <- list(state = state,sex = TRUE,block = block,tract = tract,county = county)
                     
                     # store the results as rds
                     saveRDS(results,file = paste0(path_results,"/",state,".Rds"))
                     
                     # remove the results
                     rm(county)
                     rm(tract)
                     rm(block)
                     rm(results)
                     gc()
                     
                     # just return a string
                     paste0("done with ",state)
}

library(purrr)
# combine the results to a list
result_files <- list.files(path = path_results)
CensusObj_block_age_sex <- set_names(result_files,states) %>% 
  map(~ readRDS(file = paste0(path_results,.x)))
,

我有一个可能的解决方案是将CensusObj的值记录到文本文件,即在每次迭代中打印CensusObj。 doSNOW软件包可用于记录日志

library(doSNOW)
cl <- makeCluster(1,outfile="abc.out")
registerDoSNOW(cl)
states <- c("AL","PR")
foreach(i=1:length(states),.combine=rbind,.inorder = TRUE) %dopar% {
    county <- "A"
    tract  <- "B"
    block  <- "C"
    censusObj <- data.frame(state = states[i],county = county)
    # edit: print json objects to easily extract from the file
    cat(sprintf("%s\n",rjson::toJSON(censusObj)))
}
stopCluster(cl)

这将在abc.out中记录censusObj的值,并在程序崩溃时记录错误,但是您将获得在abc.out中登录的censusObj的最新值。

这是日志文件中最后一次迭代的输出:

Type: EXEC {"state":"PR","age":true,"sex":true,"block":"C","tract":"B","county":"A"} Type: DONE

Type:EXEC表示迭代已开始,而Type:DONE表示执行已完成。 cat的结果将出现在每次迭代的这两个语句之间。现在,可以从日志文件中提取CensusObj的值,如下所示:

Lines = readLines("abc.out")
results = list()
for(i in Lines){
    # skip processing logs created by doSNOW
    if(!startsWith(i,"starting") && !startsWith(i,"Type:")){
        results = rlist::list.append(results,jsonlite::fromJSON(i))      
    }
}

results将包含所有在abc.out中打印的值的元素。

> head(results,1)
[[1]]
[[1]]$state
[1] "AL"

[[1]]$age
[1] TRUE

[[1]]$sex
[1] TRUE

[[1]]$block
[1] "C"

[[1]]$tract
[1] "B"

[[1]]$county
[1] "A"

这不是一个很干净的解决方案,但是可以。

,

在API错误的情况下,您可以在tryCatch内使用future_lapply尝试重新启动计算,最多maxtrials
在结果列表中,您将为每次计算获得试验次数和最终状态OKError

    states <- c("AL","PR")
    library(future.apply)
    #> Le chargement a nécessité le package : future
    plan(multiprocess)
    ptm <- proc.time()

    maxtrials <- 3

    census_geo_api <-
      function(key = "XXX",state = s,sex = TRUE) {
        paste(state,'-',geo)
      }


    CensusObj_block_age_sex <- future_lapply(states,function(s) {
      ntrials <- 1
      while (ntrials <= maxtrials) {
        hasError <- tryCatch({
          #simulate random error
          if (runif(1)>0.3) {error("API failed")}
          county <- census_geo_api(key = "XXX",sex = TRUE)
          tract  <- census_geo_api(key = "XXX",sex = TRUE)
          block  <- census_geo_api(key = "XXX",sex = TRUE)
        },error = function(e)
          e)

        if (inherits(hasError,"error")) {
          ntrials <- ntrials + 1
        } else { break}
          
      }
      
      if (ntrials > maxtrials) {
        res <- list(state = s,status = 'Error',ntrials = ntrials-1,age = NA,sex = NA,block = NA,tract = NA,county = NA)
      } else  {
        res <- list(state = s,status = 'OK',ntrials = ntrials,county = county)
      }
      res
    }
    )

    CensusObj_block_age_sex[[1]]
    #> $state
    #> [1] "AL"
    #> 
    #> $status
    #> [1] "OK"
    #> 
    #> $ntrials
    #> [1] 3
    #> 
    #> $age
    #> [1] TRUE
    #> 
    #> $sex
    #> [1] TRUE
    #> 
    #> $block
    #> [1] "AL - block"
    #> 
    #> $tract
    #> [1] "AL - tract"
    #> 
    #> $county
    #> [1] "AL - county"

<sup>Created on 2020-08-19 by the [reprex package](https://reprex.tidyverse.org) (v0.3.0)</sup>

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-