如何解决Spring Integration - 2 MessageHandlers 处理一半的消息
我正在使用带有 Transformer 的 spring 集成框架
inputChannel -> kafka 消费者
outputChannel -> 数据库 jdbc 编写器
@Bean
public DirectChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@Transformer(inputChannel="inboundChannel",outputChannel="outboundChannel")
public JsonToObjectTransformer jsonToObjectTransformer() {
return new JsonToObjectTransformer(Item.class);
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler jdbcmessageHandler() {
JdbcMessageHandler jdbcMessageHandler = new ...
return ...;
}
@Bean
@ServiceActivator(inputChannel = "inboundChannel")
public MessageHandler kafkahandler() {
return new ...;
}
在我覆盖的两个处理程序中
public void handleMessage(Message<?> message)
问题:如果在kafka中总共有N条消息, 然后每个 handleMessage() 被精确调用 n/2 次!
我假设每个处理程序将被调用 n 次,因为每个处理程序链接到不同的通道并且总共有 n 条消息。
我错过了什么?
(如果我禁用了 kafak 处理程序,第二个处理程序将获取所有 n 条消息)
更新:
我需要订阅以从同一通道获取所有消息(kafka 处理程序将对原始数据执行某些操作,jdbc 处理程序将推送转换后的数据)
数据)
解决方法
首先,您的 inboundChannel
和 outboundChannel
已失效:您无处(至少在问题中)指定它们的名称。
input
和 output
之类的名称被框架采用并用于创建新的 MessageChannel
bean,这些 bean 在其他地方使用。
现在看看你有什么:
@Transformer(inputChannel="input"
@ServiceActivator(inputChannel = "input")
它们都是同一个 input
频道的订阅者,并且因为它是由框架自动创建为 DirectChannel
的。此通道基于循环 LoadBalancingStrategy
,因此您会在 Kafka 中看到 n/2
,因为它的服务激活器仅处理发送到该 input
通道的每一秒消息。
请在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/core.html#channel-configuration-directchannel
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。