如何解决RabbitMQ,头交换,未由头路由的消息x-match = all
我正在尝试与队列建立报头交换,其中队列根据收件人报头路由邮件。
交换类型为标头。
到目前为止,该类能够连接到交换并将消息馈送到队列。 它还能够订阅队列并接收消息。每当订户的连接被取消时,它也会关闭连接。
当前的问题是邮件不是由收件人的标头值路由的。
提供以下课程:
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@Slf4j
public class MyQueue {
private final ConnectionFactory connectionFactory;
private Channel channel;
public MyQueue(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public String sendMessage(TestTextMessage message) throws UndeliverableMessageException {
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
Map<String,Object> headers = new HashMap<>();
headers.put(RabbitMqConfig.MATCH_HEADER,message.getRecipient());
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority())
.headers(headers).build();
log.info("Sending message to {}",headers);
channel.basicPublish(RabbitMqConfig.EXCHANGE_NAME,"",props,message.getMessage().getBytes(StandardCharsets.UTF_8));
log.info("RabbitMQ sent message {} to {}",message.getMessage(),message.getRecipient());
return "ok";
} catch (TimeoutException e) {
log.error("Rabbit mq timeout",e);
} catch (IOException e) {
log.error("Rabbit mq io error",e);
}
throw new UndeliverableMessageException();
}
public Flux<String> listenMessages(String recipient) throws IOException,TimeoutException {
Connection connection = connectionFactory.newConnection();
this.channel = connection.createChannel();
// The map for the headers.
Map<String,Object> headers = new HashMap<>();
headers.put("x-match","all");
headers.put(RabbitMqConfig.MATCH_HEADER,recipient);
final String[] consumerTag = new String[1];
Flux<String> as = Flux.create(sink -> new MessageListener<String>() {
{
try {
log.info("Binding to {}",headers);
channel.queueBind(RabbitMqConfig.QUEUE_NAME,RabbitMqConfig.EXCHANGE_NAME,headers);
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
log.info("Subscriber {} received a message {} with headers {}",recipient,delivery.getEnvelope(),delivery.getProperties().getHeaders());
sink.next(delivery.getEnvelope().getDeliveryTag() + "--" + message);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
consumerTag[0] = channel.basicConsume(RabbitMqConfig.QUEUE_NAME,true,deliverCallback,tag -> {
sink.complete();
});
} catch (IOException e) {
log.error("RabbitMQ IOException ",e);
}
}
});
return as.doOnCancel(() -> {
try {
if (consumerTag[0] == null) {
log.error("RabbitMQ uncloseable subscription,consumerTag is null!");
channel.close();
return;
}
channel.basicCancel(consumerTag[0]);
channel.close();
log.info("RabbitMQ CANCEL subscription for recipient {}",recipient);
} catch (IOException | TimeoutException e) {
log.error("RabbitMQ channel close error",e);
}
});
}
interface MessageListener<T> {
}
}
通过以下调用声明交换
channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME,BuiltinExchangeType.HEADERS,true);
绑定收件人日志:
Binding to {x-match=all,message-recipient=mary}
Binding to {x-match=all,message-recipient=james}
Binding to {x-match=all,message-recipient=john}
但是,消息不匹配,就像它们是随机路由一样
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1,redeliver=false,exchange=my-exchange,routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1,routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1,routingKey=) with headers {message-recipient=james}
x-match: all
为什么不匹配?
解决方法
在阅读了@Gryphon在订阅方发表的评论后,我最终为每个参与者创建了一个队列。
channel.queueDeclare(RabbitMqConfig.QUEUE_NAME + "-" + recipient,true,false,null)
在发布者端,代码保持不变,消息被发送到交换,交换将基于x-match: all
配置处理路由,将消息路由到适当的队列。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。