如何解决安全地从一个容器以随机顺序使用消息切换到另一个以指定顺序
我有一个服务 (S1),它生成事件以通过 Kafka 为其他服务设置基线。服务从文件中读取数据并产生事件到三个主题 t1、t2、t3。
我有另一个服务 (S2) 并且那里的消费者 (C) 订阅了 3 个主题(t1、t2、t3)。我有一个问题,因为消费者随机从主题读取数据,所以我收到与数据关系相关的错误。我的意思是系统尝试创建实体但关系尚不存在。一般来说,由于实际(业务)流程限制(我无法在另一个实体之前创建一个实体),S2 工作正常。所以有时我需要一种方法来将消费从随机顺序切换到指定顺序 (t1 -> t2 -> t3)。
我的想法是为每个主题(m1->t1、m2->t2、m3->t3)添加一个额外的容器(M1、M2、M3),在外部事件 f.i. 上暂停 C。 MIGRATION_START 并按特定顺序开始消耗 M1、M2、M3。消耗完所有消息后,我将暂停 M1、M2、M3 并重新启动 C。
出于某些原因,我不想使用像 @KafkaListener 这样的注解。
管理容器之间切换的最佳位置在哪里?如何正确进行切换?
感谢您的回答。
我肯定会发布一些代码。这不是工作示例和想法。
class KafkaEventSource(
val pipeline: IEventPipeline,val expectedEventStreamVersion: Int,val topics: List<String>? = null,val eventContextFactory: EventContextFactory,) : AbstractConsumerSeekAware(),AcknowledgingMessageListener<UUID,IEventRecord>,ConsumerRebalanceListener,IEventSource {
private val LOG = LoggerFactory.getLogger(this::class.java)
private val containers = ConcurrentHashMap<String,AbstractMessageListenerContainer<UUID,IEventRecord>>()
private val container = AbstractMessageListenerContainer<UUID,IEventRecord>>()
private fun setupProperties(topics: List<String>): ContainerProperties {
val containerProps = ContainerProperties(*topics.toTypedArray())
val retryAdapter = RetryingMessageListenerAdapter(this,retryTemplate)
containerProps.messageListener = retryAdapter
containerProps.consumerRebalanceListener = this
containerProps.idleEventInterval = 15000
containerProps.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE // Changed from MANUAL
containerProps.transactionManager = transactionManager
containerProps.groupId = groupId
containerProps.clientId = clientId
containerProps.eosMode = ContainerProperties.EOSMode.BETA // Needed to allow automatic handling of InvalidPidMappingException
return containerProps
}
@PostConstruct
override fun start() {
val topics = topics ?: eventContextFactory.eventRegistry.topics.toSet()
container = newContainer(topics,startContainer = true)// create container with properties setupProperties
topics.forEachIndexed { index,topic -> newMigrationContainers(topic,startContainer = false) } // create containers M1,M2,M3 with properties setupProperties
}
override fun waitForEnd() {
}
override fun stop() {
}
override fun onMessage(
data: ConsumerRecord<UUID,acknowledgment: Acknowledgment
) {
// where is the best place for such code. Is it ok to pause container C here and start M1 for migration?
if (data.value().eventType == "MIGRATION_START") {
// pause container C
// resume container M1 for migration
}
pipeline.deliver(batch)
acknowledgment.acknowledge()
LOG.info("Acknowledged event $event (${event.eventId}) from Kafka " +
"topic ${data.topic()} partition ${data.partition()} at offset ${data.offset()} in batch $correlationId")
}
override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>) {
// some code here
}
override fun onPartitionsRevoked(partitions: MutableCollection<TopicPartition>) {
for (partition in partitions) {
LOG.info("KafkaEventSource revoked topic ${partition.topic()} partition ${partition.partition()}.")
}
// TODO: what should we do here?
}
override fun onIdleContainer(
assignments: MutableMap<TopicPartition,Long>,callback: ConsumerSeekAware.ConsumerSeekCallback?
) {
// pause M1 -> resume M2,then pause M2 -> resume M3 -> pause M3 -> startC
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。