实现Kafka的优先级消费

优先级消费是业务中经常碰到的场景。对于同一类消息,消息内容可能由于其业务上的属性不同,而具有不同的“重要程度”,比如在一个消息推送系统中,一条告警消息显然比一条营销消息重要的多,我们希望即使队列中已经有了大量消息的情形下,优先级更高的消息,仍能够实现“插队”的优先消费,以保证更重要的消息不回被低优先级的消息阻塞。

概述

实际上,这个需求也在Kafka的官方需求中(KIP-349: Priorities for Source Topics),目前的状态是Under Vote,这个Proposal是2019年提出来的,看来官方的方案是指望不上了,只能找些第三方的轮子,或者自己来实现。

在每个Topic中,Kafka顺序写以获得尽可能获得高吞吐,使用Index文件来维护Consumer的消息拉取,维护维度是Offset。Offset不包含优先级语义,但需要顺序语义,优先级语义本身包含非顺序语义,因此就语义来看,以Offset为维度的拉模型MQ和优先级需求本质是冲突的。所以对于单个Topic,在Kafka原生实现消息优先级可行性不高。
因此很自然的,我们能够想到,可以创建多个Topic,每个Topic代表一个优先级。

  • 在生产者端,引入优先级字段,以数字来表示,数值越高优先级越高。在向broker推消息时,根据其优先级推送到不同的topic中。
  • 在消费者端,通过实现对不同优先级Topic的消费,以实现消息的优先消费。

对于消息的生产,实现起来比较简单,问题的难点在于消费者端如何消费不同Topic的消息,以实现高优先级的消息能够被优先处理?

这里大致有三种方案

  1. 对于不同的topic,各个consumer分别拉取,拉去后在服务内部使用优先队列进行缓冲。
  2. 使用一个consumer,优先拉取高优先级消息,没有的话再拉去次优先级消息。
  3. 使用不同的consumer分别拉取各个topic,但是拉取的消息数量不同,对于高优先级的消息,拉取的“配额”更多。

服务内部优先队列缓冲

1

对于这种方案,为了避免OOM,需要使用有界优先队列。

然而对于有界优先队列,在消息消费逻辑复杂,处理速度不够快时,可能会导致优先队列的阻塞。

同时在阻塞时,消息的放入无法保证按照优先级放入,影响优先级的功能。

这里一个可能的做法是不在服务内部的优先队列中维护,而是将消息再放到Redis Zset中进行排序。这样会避免OOM和阻塞的问题,但是会增加系统的复杂度。

优先拉取高优先级Topic

这种方法看起来还不错,但是实现起来逻辑比较复杂。

在每次拉取消息时,都需要先尝试拉取高优先级数据,没有数据再拉取低优先数据。这样的问题有

  1. 每次都要串行的判断各个优先级数据是否存在,实际的场景中往往是高优消息时比较少的,每次轮询到较低优先级才拉取到消息,性能上可能存在一定问题。
  2. 即使高优topic中有消息,也是比较少的,实际应用中kafka需要批量拉取消息,在高优消息较少,没有达到每次的拉取数量时,该如何去处理,这里也比较麻烦。
  3. 通过这种方式拉取消息,会导致低优先级的消息完全得不到消费的机会。

同时拉取多Topic,“权重”不同

2

对于这种方式,这里有一个开源的实现flipkart-incubator/priority-kafka-client
对于每次拉取,需要设定每次拉取的数量

// CapacityBurstPriorityKafkaConsumer.java
void updateMaxPollRecords(KafkaConsumer<K, V> consumer, int maxPollRecords) {
    try {
        Field fetcherField = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField(FETCHER_FIELD);
        fetcherField.setAccessible(true);
        Fetcher fetcher = (Fetcher) fetcherField.get(consumer);
        Field maxPollRecordsField = Fetcher.class.getDeclaredField(MAX_POLL_RECORDS_FIELD);
        maxPollRecordsField.setAccessible(true);
        maxPollRecordsField.set(fetcher, maxPollRecords);
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
}

对于每次拉取的数量,按照优先级的“权重”不同,分配到不同的topic上。默认的分配策略是按照指数分配。

比如对于每次拉取50个记录,3个优先级的情况下,三个优先级的比例按指数分布,为1:2:4,实际的配额即为7:14:29。
这里有一个很明显的问题是对于高优先级的数据,如果每次拉取不到指定的数量,这部分配额相当于被浪费掉了,这样会影响整体的拉取性能。

对于这种情况,代码中为每个优先级维护了一个“滑动窗口”来记录近期拉取的数量的历史记录,在拉取前,会根据历史拉取情况来进行配额的rebalance,以此来实现配额的动态分配。

for (int i = maxPriority - 1; i >= 0; --i) {
    if (isEligibleToBurst(i)) {
        int burstCapacity = burstCapacity(i);
        if (burstCapacity > 0) {
            priorityBurst = i;
            int finalCapacity = burstCapacity + maxPollRecordDistribution.get(i);
            log.info("Burst in capacity for priority {} to {}", priorityBurst, finalCapacity);
            updateMaxPollRecords(consumers.get(priorityBurst), finalCapacity);
        }
        break;
    }
}

感兴趣的朋友可以看一下代码里的具体实现。这里就不再赘述了。

3

在这里插入图片描述

可以看到最终的优先级效果,对于图1,消费者能够按照优先级进行对高优先级的优先消费,当高优先级消费完成后,将配额分配给低优先级topic。对于图2,当高优先级的的数据再次生产后,能够再对优先级进行优先消费,实现了优先级的功能。

在项目中,我也是最终使用了这种方案。
由于在maven仓库中没有该开源实现,我直接将jar包放到了源代码的lib目录下,通过本地引入的方式导入了库。

<dependency>
    <groupId>com.flipkart</groupId>
    <artifactId>priority-kafka-client</artifactId>
    <version>1.0.0</version>
    <scope>system</scope>
    <systemPath>${project.basedir}/lib/priority-kafka-client-1.0.0.jar</systemPath>
</dependency>

在打包时,也需要增加相应的配置

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <includeSystemScope>true</includeSystemScope>
            </configuration>
        </plugin>
    </plugins>
</build>

引入后,具体的使用还是比较简单的。为PriorityKafka进行相应的配置,就可通过依赖注入的方式进行使用了。

@Configuration
public class PriorityKafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.max-priority}")
    private Integer maxPriority;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Bean
    public PriorityKafkaProducer<Integer, String> createPriorityProducer() {
        Properties props = new Properties();
        props.put(ClientConfigs.MAX_PRIORITY_CONFIG, String.valueOf(maxPriority));
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        PriorityKafkaProducer<Integer, String> producer = new PriorityKafkaProducer<Integer, String>(props);
        return producer;
    }

    @Bean
    public org.apache.kafka.clients.consumer.Consumer<Integer, String> createPriorityConsumer() {
        Properties props = new Properties();
        props.put(ClientConfigs.MAX_PRIORITY_CONFIG, String.valueOf(maxPriority));
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "10500000");
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10500000");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "120000");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        org.apache.kafka.clients.consumer.Consumer<Integer, String> consumer
                = new CapacityBurstPriorityKafkaConsumer<Integer, String>(props);
        return consumer;
    }
}

以下是我对原先使用的KafkaProducer以及通过@KafkaListener进行消费的改造,改动量还是比较小的。

@Resource
private PriorityKafkaProducer<Integer, String> producer;

/**
 * 推送数据到kafka broker, 带优先级
 * @param obj
 */
public void send(int priority, Object obj, String topic) {
    String jsonStr = JSONObject.toJSONString(obj);
    //发送消息

    ProducerRecord<Integer, String> record
            = new ProducerRecord<Integer, String>(topic, jsonStr);
    producer.send(priority, record, (recordMetadata, exception) -> {
        if(exception == null) {
            //成功的处理
            log.info(topic + " - 生产者 发送消息成功:" + jsonStr);
        } else {
            //发送失败的处理
            log.error(topic + " - 生产者 发送消息失败:" + exception.getMessage());
        }
    });
}
//优先级消费,consumer需注入 @Resourceprivate Consumer<Integer, String> consumer;

ConsumerRecords<Integer, String> records = consumer.poll(100);
if (records.isEmpty()) {
    break;
}
log.info("poll {} data form topic {}", records.count(), MESSAGE_TOPIC);
for (TopicPartition partition : records.partitions()) {
    for (ConsumerRecord<Integer, String> record : records.records(partition)) {
        PushMessage message = JSONObject.parseObject(record.value(), PushMessage.class);
        pushHandlerService.handlePushMsg(message);
    }
}
consumer.commitAsync();

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340