如何解决多线程消费者 EasyNetQ
我正在尝试使用 EasyNetQ 来使用来自底层 Rabbit MQ 的消息。每个消费者线程应该一次处理一条消息。同一消息不应由超过 1 个消费者线程处理。我用过
advancedBus.Get<MyMessage>(queue);
文档说“要从队列中获取单个消息,请使用 IAdvancedBus.Get 方法”。我注意到当我启动应用程序并且消息已经排队时,2 个线程选择了 1 条消息。 EasyNetQ 也有 Consume 方法。
advancedBus.Consume(queue,(body,properties,info) => Task.Factory.StartNew(() =>
{
//process the message
}));
当新消息进入队列时,这个线程会调用尽可能多的线程,有时可能是数千个。为了限制线程数,我发现了这个 https://www.mariuszwojcik.com/how-to-process-messages-in-parallel-using-easynetq/#:~:text=The%20prefetch%20count%20can%20be,MaxValue)。 示例使用 SubscribeAsync,但我认为这也适用于 Consume。
我不知道如何对 Consume 进行异常处理。
- 它可以抛出什么样的异常?
- 此外,如果出现异常,我是否需要重置我的高级总线或只是重新注册消费或什么都不做(如果它自行恢复)。
- 消费不会两次选择相同的消息吗?
- 如何使用取消令牌处理Consume?我有一个取消令牌,取消后它应该停止消费。
- advancedBus.Get 不适合多个消费者案例吗?
更新 1 第 4 点。
if (!cancellationToken.IsCancellationRequested)
{
cancellationToken.Register(() =>
{
DisposeConsume(consumer);
});
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。