如何解决Webflux,基于Reactive Publisher的流程在中间被打断
该方法假设要从MongoDB中获取对象,并基于此方法对每个对象调用两次外部API,然后将结果保存回数据库。
不确定为什么在第5行中中断了整个流程。(因为调试标记的函数最后被调用了)。我试图确保所有调用的函数都是反应性的。在这一点上,不知道怎么了。反应式编程也很新。
fun fillMissingThings() {
reactiveThingsRepository.findAllBySomeCondition(EMPTY)
.map { it.name.getName() }
.flatMap(this::fetchIdByName) <-- this method is called last
.flatMap(this::fetchThingById)
.flatMap { reactiveThingsRepository.save(it) }
.subscribe()
}
fun fetchIdByName(name: String): Flux<String> =
webClient.get()
.uri(someUrlBasedOnName)
.retrieve()
.bodyToFlux(Array<SearchIdDto>::class.java)
.onErrorResume { Flux.empty() }
.retry(3)
.flatMap { Flux.fromArray(it) }
.map { it.id!! }
fun fetchThingById(thingId: String): Flux<Thing> =
webClient.get()
.uri(someUrlBasedOnId)
.retrieve()
.bodyToFlux(ThingDto::class.java)
.onErrorResume { Flux.empty() }
.retry(3)
.map { it.toEntity().setDefaultValues() }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。