如何解决在每次迭代期间通过更新并行化循环
我有一些R代码,它将美国人口普查中来自美国所有州的人口统计数据汇总到一个列表对象中。由于有大约1100万个块,因此块级代码可能需要一周的时间才能作为顺序循环运行,因此,我尝试对状态进行并行化以使其更快。我已经达到了这个目标:
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA","WA","WV","WI","WY","DC","PR")
library(future.apply)
plan(multiprocess)
ptm <- proc.time()
CensusObj_block_age_sex = list()
CensusObj_block_age_sex[states] <- future_lapply(states,function(s){
county <- census_geo_api(key = "XXX",state = s,geo = "county",age = TRUE,sex = TRUE)
tract <- census_geo_api(key = "XXX",geo = "tract",sex = TRUE)
block <- census_geo_api(key = "XXX",geo = "block",sex = TRUE)
censusObj[[s]] <- list(state = s,sex = TRUE,block = block,tract = tract,county = county)
}
)
但是,我需要使其更强大。有时Census API会出现问题,因此我希望CensusObj在每次状态迭代时都进行更新,以便在出现问题时不会丢失已完成的数据。这样,如果发生错误(例如,我将“ WY”拼写为“ WU”),我可以重新启动剩余状态的循环
是否有可能以某种方式实现这一目标?我愿意接受其他并行化方法。
上面的代码可以运行,但似乎会遇到内存问题:
Error: Failed to retrieve the value of MultisessionFuture (future_lapply-3) from cluster RichSOCKnode #3 (PID 80363 on localhost ‘localhost’). The reason reported was ‘vector memory exhausted (limit reached?)’. Post-mortem diagnostic: A process with this PID exists,which suggests that the localhost worker is still alive.
我的.Renviron中有R_MAX_VSIZE = 8Gb,但是我不确定如何在机器的8个内核之间进行划分。所有这些都表明我需要存储每次迭代的结果,而不是尝试将其全部保存在内存中,然后在最后将对象附加在一起。
解决方法
这里是一种使用doParallel
(具有UNIX系统的选项,但您也可以在Windows上使用,请参见here)和foreach
的解决方案,该解决方案存储了每个状态分开,然后读入单个文件并将它们组合到一个列表中。
library(doParallel)
library(foreach)
path_results <- "my_path"
ncpus = 8L
registerDoParallel(cores = ncpus)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA","WA","WV","WI","WY","DC","PR")
results <- foreach(state = states) %dopar% {
county <- census_geo_api(key = "XXX",state = state,geo = "county",age = TRUE,sex = TRUE)
tract <- census_geo_api(key = "XXX",geo = "tract",sex = TRUE)
block <- census_geo_api(key = "XXX",geo = "block",sex = TRUE)
results <- list(state = state,sex = TRUE,block = block,tract = tract,county = county)
# store the results as rds
saveRDS(results,file = paste0(path_results,"/",state,".Rds"))
# remove the results
rm(county)
rm(tract)
rm(block)
rm(results)
gc()
# just return a string
paste0("done with ",state)
}
library(purrr)
# combine the results to a list
result_files <- list.files(path = path_results)
CensusObj_block_age_sex <- set_names(result_files,states) %>%
map(~ readRDS(file = paste0(path_results,.x)))
,
我有一个可能的解决方案是将CensusObj的值记录到文本文件,即在每次迭代中打印CensusObj。 doSNOW软件包可用于记录日志
library(doSNOW)
cl <- makeCluster(1,outfile="abc.out")
registerDoSNOW(cl)
states <- c("AL","PR")
foreach(i=1:length(states),.combine=rbind,.inorder = TRUE) %dopar% {
county <- "A"
tract <- "B"
block <- "C"
censusObj <- data.frame(state = states[i],county = county)
# edit: print json objects to easily extract from the file
cat(sprintf("%s\n",rjson::toJSON(censusObj)))
}
stopCluster(cl)
这将在abc.out中记录censusObj的值,并在程序崩溃时记录错误,但是您将获得在abc.out中登录的censusObj的最新值。
这是日志文件中最后一次迭代的输出:
Type: EXEC {"state":"PR","age":true,"sex":true,"block":"C","tract":"B","county":"A"} Type: DONE
Type:EXEC
表示迭代已开始,而Type:DONE
表示执行已完成。 cat
的结果将出现在每次迭代的这两个语句之间。现在,可以从日志文件中提取CensusObj
的值,如下所示:
Lines = readLines("abc.out")
results = list()
for(i in Lines){
# skip processing logs created by doSNOW
if(!startsWith(i,"starting") && !startsWith(i,"Type:")){
results = rlist::list.append(results,jsonlite::fromJSON(i))
}
}
results
将包含所有在abc.out中打印的值的元素。
> head(results,1)
[[1]]
[[1]]$state
[1] "AL"
[[1]]$age
[1] TRUE
[[1]]$sex
[1] TRUE
[[1]]$block
[1] "C"
[[1]]$tract
[1] "B"
[[1]]$county
[1] "A"
这不是一个很干净的解决方案,但是可以。
,在API错误的情况下,您可以在tryCatch
内使用future_lapply
尝试重新启动计算,最多maxtrials
。
在结果列表中,您将为每次计算获得试验次数和最终状态OK
或Error
:
states <- c("AL","PR")
library(future.apply)
#> Le chargement a nécessité le package : future
plan(multiprocess)
ptm <- proc.time()
maxtrials <- 3
census_geo_api <-
function(key = "XXX",state = s,sex = TRUE) {
paste(state,'-',geo)
}
CensusObj_block_age_sex <- future_lapply(states,function(s) {
ntrials <- 1
while (ntrials <= maxtrials) {
hasError <- tryCatch({
#simulate random error
if (runif(1)>0.3) {error("API failed")}
county <- census_geo_api(key = "XXX",sex = TRUE)
tract <- census_geo_api(key = "XXX",sex = TRUE)
block <- census_geo_api(key = "XXX",sex = TRUE)
},error = function(e)
e)
if (inherits(hasError,"error")) {
ntrials <- ntrials + 1
} else { break}
}
if (ntrials > maxtrials) {
res <- list(state = s,status = 'Error',ntrials = ntrials-1,age = NA,sex = NA,block = NA,tract = NA,county = NA)
} else {
res <- list(state = s,status = 'OK',ntrials = ntrials,county = county)
}
res
}
)
CensusObj_block_age_sex[[1]]
#> $state
#> [1] "AL"
#>
#> $status
#> [1] "OK"
#>
#> $ntrials
#> [1] 3
#>
#> $age
#> [1] TRUE
#>
#> $sex
#> [1] TRUE
#>
#> $block
#> [1] "AL - block"
#>
#> $tract
#> [1] "AL - tract"
#>
#> $county
#> [1] "AL - county"
<sup>Created on 2020-08-19 by the [reprex package](https://reprex.tidyverse.org) (v0.3.0)</sup>
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。