Spring-RabbitMQ 队列长度限制实践

Springboot 版本: 2.7.0

超出队列限制后会发生什么?

  1. 丢弃旧消息:如果没有配置关联死信队列,则丢弃最老的消息。
  2. 将旧消息路由到死信队列:如果配置有关联的死信队列,则将最老的消息路由到死信队列。
  3. 拒绝新消息入队:如果默认行为不满足需求,可以通过参数 overflow 进行修改。
    • reject-publish:拒绝最新的消息发布。如果生成者配置有消息确认,那么broker会异步通知生产者消息发送失败。
    • reject-publish-dlx:除了同reject-publish相同的功能外,还会拒绝死信消息。

怎么设置队列长度?

有两种方式:

  1. 服务端通过policy设置
  2. 客户端在队列声明时使用队列的可选参数进行配置

如果服务端和客户端都做了设置,那么以二者中的小值为准。

服务端通过policy设置

命令行配置

配置命令:

rabbitmqctl set_policy my-pol "^myQueue$" '{"max-length":5, "max-length-bytes":1048576, "overflow":"reject-publish"}'   --apply-to queues --vhost my_vhost
  1. name: my-pol
  2. pattern: ^myQueue$
  3. definition:
    1. max-length: 5; 最多包含5个消息
    2. max-length-bytes:1048576; 最多包含1MiB的消息数据
    3. overflow:reject-publish; 超出限制后直接拒绝新的消息入队

配置结果:

Spring-RabbitMQ 队列长度实践


Spring-RabbitMQ 队列长度实践

管理页面配置

填写内容和命令行是一样的,其结果也是一样的。

Spring-RabbitMQ 队列长度限制实践

客户端申明队列时配置

    @Bean
    public Queue queue() {

        // 常规队列与死信交换机的绑定关系
        Map<String, Object> queueParams = new HashMap<>(2);
        //设置队列长度为5
        queueParams.put("x-max-length", 5);
        queueParams.put("max-length-bytes", 1048576);

        return new Queue(QUEUE_NAME, true, false, false, queueParams);
    }

代码实践

只限制消息长度(丢弃旧消息)

配置文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
    virtual-host: my_vhost
    # 消息确认(ACK)
    publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
    publisher-returns: true #确认消息已发送到队列(Queue)
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: MANUAL

配置类

定义交换机、队列以及他们之间的绑定关系,并开启生产者消息确认。

@Slf4j
@Configuration
public class RabbitConfiguration {

   public final static String TOPIC_EXCHANGE = "myExchange";

   public final static String QUEUE_NAME = "myQueue";


   @Bean
   public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
      return new RabbitAdmin(connectionFactory);
   }


   @Bean
   public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      RabbitTemplate template = new RabbitTemplate(connectionFactory);
      template.setMessageConverter(jsonConverter());
      template.setExchange(TOPIC_EXCHANGE);
      template.setConfirmCallback((correlationData, ack, cause) -> {
         if (ack) {
            log.info("消息:{}发送成功", correlationData.getId());
         } else {
            log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
         }
      });

      template.setMandatory(true);
      template.setReturnsCallback(returned -> {
         log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
      });
      return template;
   }

   // 申明一个常规的交换机
   @Bean
   public TopicExchange topicExchange() {
      return new TopicExchange(TOPIC_EXCHANGE, true, false);
   }


   // 申明一个常规使用的队列
   @Bean
   public Queue queue() {

      // 常规队列与死信交换机的绑定关系
      Map<String, Object> queueParams = new HashMap<>(2);
      //设置队列长度为5
      queueParams.put("x-max-length", 5);
      queueParams.put("x-max-length-bytes", 1048576);
      return new Queue(QUEUE_NAME, true, false, false, queueParams);
   }

   @Bean
   public Binding binding() {
      return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
   }

   @Bean
   public Jackson2JsonMessageConverter jsonConverter() {
      return new Jackson2JsonMessageConverter();
   }
   
}


生产者

连续发布10个消息。

@Component
public class Publisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){

        for (int i = 0; i < 10; i++) {
            User user = new User("kleven", 18, i+1);
            rabbitTemplate.convertAndSend("my.test.message", user, new CorrelationData(user.getId().toString()));
        }
    }

}

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User implements Serializable {
   private static final long serialVersionUID = -5079682733940745661L;

   private String name;
   private Integer age;
   private Integer id;

}

测试类

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {App.class})
public class QueueLengthTest {

    @Autowired
    private Publisher publisher;


    @Test
    public void testSend(){
        publisher.send();
        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

测试结果

10个消息都发布成功,但是队列中只有后5个消息,前5个消息被丢弃。

Spring-RabbitMQ 队列长度限制实践


Spring-RabbitMQ 队列长度限制实践

限制消息长度,并配置死信队列(将旧消息路由到死信队列)

修改配置类增加死信队列,其他保持不变。

配置类

@Slf4j
@Configuration
public class RabbitConfiguration {

    public final static String TOPIC_EXCHANGE = "myExchange";

    public final static String QUEUE_NAME = "myQueue";

    public final static String DEAD_EXCHANGE = "myDeadExchange";

    public final static String DEAD_QUEUE = "myDeadQueue";


    @Bean
    public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }


    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonConverter());
        template.setExchange(TOPIC_EXCHANGE);
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息:{}发送成功", correlationData.getId());
            } else {
                log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
            }
        });

        template.setMandatory(true);
        template.setReturnsCallback(returned -> {
            log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
        });
        return template;
    }

    // 申明一个常规的交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE, true, false);
    }



    // 申明一个常规使用的队列
    @Bean
    public Queue queue() {

        // 常规队列与死信交换机的绑定关系
        Map<String, Object> queueParams = new HashMap<>(4);
        queueParams.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        queueParams.put("x-dead-letter-routing-key","my.dead.letter");
        //设置队列长度为5
        queueParams.put("x-max-length", 5);
        queueParams.put("x-max-length-bytes", 1048576);
        return new Queue(QUEUE_NAME, true, false, false, queueParams);
    }


    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
    }

    @Bean
    public Jackson2JsonMessageConverter jsonConverter() {
        return new Jackson2JsonMessageConverter();
    }



    // 申明一个死信交换机
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE, true, false);
    }


    // 申明一个死信队列
    @Bean
    public Queue deadQueue() {
        return new Queue(DEAD_QUEUE);
    }


    // 绑定死信交换机和死信队列
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("my.dead.letter");
    }
    
}

测试结果

10个消息均发布成功,前5个旧的消息进入死信队列,后5个消息在常规业务队列。

Spring-RabbitMQ 队列长度限制实践


Spring-RabbitMQ 队列长度限制实践

限制消息长度,并配置 overflow (拒绝新消息入队)

修改配置类增加overflow配置,增加一个消费者,其他保持不变。

配置类

    @Bean
    public Queue queue() {

        // 常规队列与死信交换机的绑定关系
        Map<String, Object> queueParams = new HashMap<>(5);
        queueParams.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        queueParams.put("x-dead-letter-routing-key","my.dead.letter");
        //设置队列长度为5
        queueParams.put("x-max-length", 5);
        queueParams.put("x-max-length-bytes", 1048576);
        queueParams.put("x-overflow", "reject-publish");
        return new Queue(QUEUE_NAME, true, false, false, queueParams);
    }

消费者

@Slf4j
@Component
public class Consumer {


    @RabbitListener(queues = "myQueue", messageConverter = "jsonConverter")
    public void normalConsumer(@Payload User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {
        // 假设消费者消费一条消息需要2s
        Thread.sleep(2_1000);
        log.info("正常消费者消费 -> {}", user);
        channel.basicAck(deliveryTag, false);

    }

}

测试结果

前6个消息发送成功;后4个消息因为被拒绝所以发送失败。

Spring-RabbitMQ 队列长度限制实践


Spring-RabbitMQ 队列长度限制实践

为什么限制的长度是5却有6个消息发送成功呢?
原因是队列长度(及所占字节数)限制只针对Ready状态的消息,有上图可知,因为我们这次加了一个消费者,其正在消费一个消息但还没有确认,所以有一个消息的状态是Unacked。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340