如何解决Project Reactor:有条件地重复上一步的返回值
上下文
首先:我是Project Reactor的新手,还是Java的新手,我已经在寻找答案,但是到目前为止,他们并没有帮助我。
我正在使用Cosmos DB,并希望扩展。缩放过程可能需要一段时间(对于大型缩放),因此我想初始化缩放并等待直到缩放完成(可以通过container.readThroughput()
-> isReplacePending()
完成)。
int secondsToWaitWhilePending = 10;
CosmosAsyncContainer container = this.getContainer(database,containerId);
ThroughputResponse initialThroughputResponse = container.readThroughput().block();
if (initialThroughputResponse.isReplacePending()) {
logger.warn("changeRequestUnitsOnContainerAsync: Another throughput change is still pending,please try again later...");
} else {
logger.info("changeRequestUnitsOnContainerAsync: No other throughput change is pending...");
int currentThroughput = initialThroughputResponse.getProperties().getManualThroughput();
logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: The current throughput is '%s'...",currentThroughput));
if (currentThroughput != targetThroughput) {
logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Scaling to '%s'...",targetThroughput));
ThroughputProperties targetThroughputProperties = ThroughputProperties.createManualThroughput(targetThroughput);
ThroughputResponse replaceThroughputResponse = container.replaceThroughput(targetThroughputProperties).block();
boolean replaceIsPending = replaceThroughputResponse.isReplacePending();
if (replaceIsPending && waitForScalingToComplete) {
logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling in progress and waiting for it to complete...");
} else if (replaceIsPending && !waitForScalingToComplete) {
logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling in progress and but not waiting for it to complete...");
while (replaceIsPending) {
replaceThroughputResponse = container.readThroughput().block();
replaceIsPending = replaceThroughputResponse.isReplacePending();
TimeUnit.SECONDS.sleep(secondsToWaitWhilePending);
}
} else {
logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling already completed...");
}
} else {
logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Container has already throughput of '%s'...",targetThroughput));
}
logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Scaling throughput for container with id '%s' finished...",containerId));
}
目标
我想摆脱所有障碍,并返回Mono消费者可以订阅的内容。
目前我有以下
Mono<Void> response = container.readThroughput()
.flatMap(initialThroughputResponse -> {
if (initialThroughputResponse.isReplacePending()) {
logger.warn("changeRequestUnitsOnContainerAsync: Another throughput change is still pending,please try again later...");
return Mono.empty();
} else {
logger.info("changeRequestUnitsOnContainerAsync: No other throughput change is pending...");
int currentThroughput = initialThroughputResponse.getProperties().getManualThroughput();
logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: The current throughput is '%s'...",currentThroughput));
if (currentThroughput != targetThroughput) {
logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Scaling to '%s'...",targetThroughput));
ThroughputProperties targetThroughputProperties = ThroughputProperties.createManualThroughput(targetThroughput);
return container.replaceThroughput(targetThroughputProperties);
} else {
logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Container has already throughput of '%s'...",targetThroughput));
return Mono.empty();
}
}
})
.flatMap(replaceThroughputResponse -> {
if (replaceThroughputResponse != null) {
boolean replaceIsPending = replaceThroughputResponse.isReplacePending();
if (replaceIsPending && waitForScalingToComplete) {
logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling in progress and waiting for it to complete...");
container.readThroughput().flatMap(replaceThroughputCheckResponse -> {
return Mono.just(replaceThroughputCheckResponse);
}).???????????????;
return Mono.empty();
} else if (replaceIsPending && !waitForScalingToComplete) {
logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling in progress and but not waiting for it to complete...");
return Mono.empty();
} else {
logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling already completed...");
return Mono.empty();
}
} else {
return Mono.empty();
}
});
它有效,但是缺少一件事:while循环/在缩放仍在进行时重复(请参见???????????????
)。理想情况下,我想重复查询状态,并根据响应进行下一次迭代(或不迭代)。
我该如何实现? 我的方法是好的还是您会做另一种方式(例如,它是否按预期的方式工作(即无阻塞但在整个顺序内都在顺序内))? 我在另一点订阅时的行为如何?它会等待吗(就像在C#中等待)一样?
为什么到底像Java这样的流行语言使async如此复杂(而许多其他语言只是提供了它-易于使用,可读性强)?从我的角度来看,是否可以有轻量级线程也没关系-如果每个人都异步,那么Java为什么也不能做到这一点呢? ->无需回答,只是想表达我的理解力...
谢谢
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。