如何解决如何在带有Kafka事务的Spring Integration Kafka中使用批处理提交
我已经将spring集成kafka用于读取过程写入的简单情况。消息驱动终结点bean的配置如下:
<kafka:message-driven-channel-adapter listener-container="listenerContainer"
channel="processChannel"
mode="batch"/>
<bean id="listenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" parent="kafkaMessageListenerContainerAbstract">
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="test"/>
<property name="transactionManager" ref="kafkaTransactionManager"/>
<property name="eosMode" value="BETA"/>
</bean>
</constructor-arg>
</bean>
当 KafkaMessageListenerContainer 对一批记录(例如10条记录)进行轮询时,将启动kafka事务,但是 IntegrationBatchMessageListener将所有10条记录集成到一条消息中 strong>
message = toMessagingMessage(records,acknowledgment,consumer);
有什么解决方案可以在单个kafka事务中处理一批,但在消息驱动的端点中以及在提交事务之后分别处理每个记录?
如果可能的话,我不想使用手动固定。
我见过类 BatchToRecordAdapter ,但认为它在消息驱动的端点中不可用。
解决方法
否;但是您可以添加<splitter input-channel="processChannel" output-channel="..."/>
来将列表分成单独的消息。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。