如何解决处理可能在集成流程中发生的异常
我有一个REST控制器,该控制器调用带有@MessagingGateway(errorChannel = ERROR_CHANNEL)
注释的网关
这样,无论网关启动的集成流下游发生什么错误,都将流入错误通道,该通道将由另一个集成流处理,这可以按预期进行。
现在,在另一种情况下,集成流从Kafka读取消息,将这些消息路由到另一个通道,另一个集成流处理这些消息,另一个流将HTTP请求发送到远程服务。
public IntegrationFlowBuilder attachmentEventTenantRouter(String tenantId) {
return attachmentEventBaseFlow(".*")
.filter(Message.class,m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY) != null && m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY,String.class).equalsIgnoreCase(tenantId));
}
private IntegrationFlowBuilder attachmentEventBaseFlow(String eventRegex) {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory.createContainer(topic)).errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME))
.log(LoggingHandler.Level.DEBUG,"Inside Kafka Consumer")
.filter(Message.class,m -> filter(m,eventRegex))
.transform(messageToEventTransformer);
}
@Bean
public IntegrationFlow kafkaConsumerFlow() {
return fromKafkaFlowHelper.attachmentEventTenantRouter(TENANT_ID)
.route(Message.class,m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.EVENT_TYPE_KEY,String.class),p -> p
.resolutionRequired(false)
.channelMapping("eventType","transformMessagesFromKafkaAndPublishAnotherEvent")
.defaultOutputChannel("nullChannel"))
.get();
}
@Bean
public IntegrationFlow transformMessagesFromKafkaAndPublishAnotherEvent() {
return flow -> flow
.transform(transformer)
.handle( getKafkaHandler() );
}
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(gatewayCall,e -> e.advice(expressionAdvice()));
}
如何处理以上流程中可能发生的异常?
如您所见,我正在使用ExpressionEvaluatingRequestHandlerAdvice,它可以处理handle方法,但是不确定如何处理转换器中可能发生的异常吗?
配置有错误通道的按摩网关在通过休息控制器调用网关时会成功,但是当流由Kafka使用者启动时,我就不知道如何实现此目的。
谢谢。
在阿尔特姆(Artem)添加澄清要求后编辑:
这是集成流程的配置,该流程将请求发布到远程服务,并且在没有ExpressionEvaluatingRequestHandlerAdvice的情况下,似乎无法捕获其异常并将其路由到errorChannel:
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(getOAuth2Handler(HttpMethod.PUT,"remote url"),e -> e.advice(expressionEvaluatingRequestHandlerAdvice));
}
private OAuth2RequestHandler getOAuth2Handler(HttpMethod httpMethod,String url) {
return new OAuth2RequestHandler(oAuth2RestTemplate,httpMethod,url);
}
以及实现MessageHandler的OAuth2RequestHandler类
@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
String requestBody = (String) message.getPayload();
ResponseEntity<String> response = oAuth2RestTemplate.exchange(url,new HttpEntity<>(requestBody),String.class);
}
解决方法
我看到您已经在Kafka消息驱动的通道适配器上使用了errorChannel()
。那么,问题是什么?
您的流程的这一部分与@MesaagingGateway
的{{1}}配置完全等效。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。