如何解决使用Java DSL时,为什么必须在入站Webflux网关上使用.fluxTransformf-> f?
在Spring Integration中使用webflux网关Java DSL时,我遇到了失败的回复。它仅适用于前几个请求(具体来说为
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
当我在入站网关上使用.fluxTransform(f -> f)
时或当我使用非活动 http出站网关时,我不会即使在具有数千个请求的jmeter基准上也无法得到错误。
- 为什么要在第一流程中致电
fluxTransform(f -> f)
才能使其正常工作? - 为什么在第二流程中使用
fluxTransform(f -> f)
时,如果没有Http.outboundGateway
,它为什么不起作用?
场景
我已经使用四个网关创建了一个路由,以进行相当复杂的设置以在远程计算机上发出Web请求,但是我
集成流程1:
入站Webflux网关->出站jms网关
@Bean
public IntegrationFlow step1() {
// request-reply pattern using the jms outbound gateway
var gateway = Jms.outboundGateway(jmsConnectionFactory)
.requestDestination("inboundWebfluxQueue")
.replyDestination("outboundWebfluxQueue")
.correlationKey("JMSCorrelationID");
// send a request to jms,wait for the reply and return message payload as response
return IntegrationFlows.from(webfluxServer("/example/webflux"))
// won't work consistently without the next line
.fluxTransform(f -> f)
.handle(gateway).get();
}
集成流程2:
入站JMS网关->出站Webflux网关
@Bean
public IntegrationFlow step2_using_webflux() {
var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class)
// ignore headers
.mappedResponseHeaders();
return IntegrationFlows.from(jmsInboundGateway())
// use webflux outbound gateway to send the request to the TEST_URL
.handle(gateway).get();
}
完整的路线如下:
客户端Web请求->流1->(消息代理)->流2->服务器Web请求
解决方法
另一种方法是使用.channel(MessageChannels.flux())
而不是.fluxTransform(f -> f)
。这样,我们确实给WebFlux容器带来了压力,使其等待请求事件循环中的可用插槽。
这样,我们仅向JMS发送队列时就不会承受背压,而另一端的JMS使用者则无法跟上。另外,我们将请求发送到同一台Netty服务器,并在内部再次为这些内部请求获取一个事件循环槽。
如果您有兴趣,我可以像这样编写单元测试以查看发生的情况:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {
@Autowired
private TestRestTemplate template;
@Test
void testSpringIntegrationWebFlux() {
var executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.afterPropertiesSet();
var numberOfExecutions = new AtomicInteger();
for (var i = 0; i < 100; i++) {
executor.execute(() -> {
var responseEntity = this.template.getForEntity("/example/webflux",String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
numberOfExecutions.getAndIncrement();
}
});
}
executor.shutdown();
assertThat(numberOfExecutions.get()).isEqualTo(100);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。