如何解决用于Java的RabbitMQ库中的多线程
当我运行此代码时:
public static void main(String[] args) throws IOException,TimeoutException {
MainRabbit main = new MainRabbit();
for (int i = 0; i < 5; i++) {
Connection connection = main.createConnection();
Channel channel = connection.createChannel();
channel.basicConsume("queueName",new CustomConsumer());
}
}
private ExecutorService es = Executors.newFixedThreadPool(2);
private ConnectionFactory connectionFactory = createConnectionFactory();
private ConnectionFactory createConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
return connectionFactory;
}
private Connection createConnection() throws IOException,TimeoutException {
return connectionFactory.newConnection(es);
}
我看到每个与Rabbitmq的tcp连接都有2个单独的线程。在下面的屏幕截图中,这是AMQP连接线程,也是来自池中的名为“ pool-2”的线程。
另外,因为我在ExecutorsService中指定了值2,所以在名为“ pool-1”的池中又创建了2个线程。
-
“ pool-2”来自哪里?
-
在
connectionFactory.newConnection(es)
中es是什么意思? JavaDocs说“为连接器上的使用者提供执行程序线程执行服务”。 这是否意味着在我的示例中,此tcp连接的所有使用者最多可以在2个线程中工作? -
如何开始同时处理多个线程中的传入消息,以便例如并行处理来自同一队列的10条不同消息?
解决方法
RabbitMQ(或AMQP)中的每个连接都被视为单独的双向数据流。在基于线程的语言中,通常将连接与线程相关联,并且不在通道之间共享线程(即“轻量级连接”)
docs from cloudamqp,well established rabbitmq-as-a-service provider
主要原因是要为上游和下游消息流配备独立的工作程序。这样可以避免在重负载情况下出现滞后,并且反压机制将继续起作用,因为客户端将继续交换ack消息
但是,通常,您实际上并不需要为每个客户端建立多个连接。让您的图书馆去做。您可能会担心,因为消耗的消息会触发长时间运行的工作,因此在收到消息时将它们通过线程池进行多路复用,池中的每个线程都会处理一条消息
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。