如何解决Vertx:Verticle的多个实例中的多个使用者
在我的应用程序中,我有两个顶点(标准顶点和非工人顶点),其中一个顶点VerticleA
产生一些消息,而另一个顶点VerticleB
消耗它们。在VerticleB
中,我创建了几个使用者来使用这些消息。
class VerticleA : AbstractVerticle() {
val publisher: MessageProducer<String>
fun start(promise: Promise<Void>) {
publisher = vertx.eventBus().sender<String>("address")
.setWriteQueueMaxSize(queueSize)
vertx.setPeriodic(timeout) {
if(publisher.writeQueueFull())
return
getAsyncMessages() { messages ->
messages.forEach { publisher.wirte(it) }
}
}
}
}
class VerticleB : AbstractVerticle() {
val consumers: List<MyConsumers>
override fun start(promise: Promise<Void>) {
// Some initialization
consumers = (1..count).map { createConsumer() }
}
fun createConsumer(): MessageConsumer<String> {
val consumer = vertx.eventBus().consumer<String>("address")
consumer.handler { message ->
consumer.pause()
asyncProcess(message) { consumer.resume() }
}
return consumer
}
}
当我仅使用VerticleB
的单个实例(即使用DeploymentOptions.setInstances(1)
)部署应用程序时,一切正常。但是,当我将数字实例设置为不止一个时,使用方将在处理少量初始消息后停止处理消息。
从日志中,我可以看到每个使用者都消耗了几条消息,然后停止消耗。暂停和恢复日志也成对出现,即每个pause()
调用都有一个resume()
调用,因此即使发生错误,asyncProcess()
也会调用回调。消费者还没有结束。
我在这里https://vertx.io/docs/vertx-core/java/阅读了核心手册,但没有发现任何可以解决此问题的方法。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。