Kafka RoundRobin 分区器不向所有分区分发消息

如何解决Kafka RoundRobin 分区器不向所有分区分发消息

我正在尝试使用 Kafka 的 RoundRobinPartitioner 类在所有分区之间均匀分布消息。我的Kafka主题配置如下:

名称:multischemakafkatopicodd

分区数:16

复制因子:2

比如说,如果我生成 100 条消息,那么每个分区应该有 6 或 7 条消息。但是,我得到了类似的东西:

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0

我想可能是我没有产生足够的消息,所以我尝试使用 1M 记录并将分区数设置为奇数:

主题:multischemakafkatopicodd

分区数:31

复制因子:2

...我明白了。这次每个分区的消息数量分布有些均匀。

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684

我再次进行了相同的测试,但将分区数量减少到 8,我得到了这个结果,我们可以清楚地看到一些分区有接近 15K 的消息,而其他分区有大约 10K:

multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599

我做错了什么还是它应该如何工作? 为什么消息分布如此不平等?

如果有人能帮助我,那就太好了。谢谢。

解决方法

据我所知,分区器运行良好。但您必须了解生产者所做的优化以最大限度地提高性能:

  • 生产者不会为每次发送调用将每条消息生成到不同的分区,因为这会矫枉过正。

  • Round-Robin 保证了类似的分布,但可以批量发送。这意味着,它会根据 {{1} }的代码:

    RoundRobinPartitioner

int part = Utils.toPositive(nextValue) % availablePartitions.size(); 是一个 nextValue,每次分区/发送调用都会增加 1。因此,余数将始终增加 1 (以循环方式,例如有 4 个分区:AtomicInteger,假设在此过程中没有分区被声明为不可用。如果发生这种情况,循环可能看起来像 0-1-2-3-0-1-2-3-...


示例

  • 主题有 4 个分区
  • 每个分区生产者分区线程缓冲区保存3条消息

(消息号计数器以 0 - 0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-... 开头)

new AtomicInteger(0)

当生成第 9 条消息时,第一个分区的缓冲区已满(因为它已经保存了 3 条消息),因此准备好发送到 kafka。如果您立即停止该过程,4 个分区将如下所示:

    MsgN % Partitions   Partition
        0%4                0
        1%4                1
        2%4                2
        3%4                3
        4%4                0
        5%4                1
        6%4                2 
        7%4                3
        8%4                0
        ...               ...

在生成第 10 条消息时,第二个分区的缓冲区也将准备好发送出去,主题将如下所示:

    Partition    Offset
       0           3
       1           0
       2           0
       3           0

在现实生活中,缓冲区通常会保存大量消息(这也可以调整)。例如,假设存储了 1000 条消息。对于相同的场景,分区看起来像:

    Partition    Offset
       0           3
       1           3
       2           0
       3           0

因此增加了分区之间的“视觉”差异。更大的批量大小/缓冲区大小会更加臭名昭著。

这与生产者的 Partition Offset 0 1000 1 1000 2 0 3 0 线程本身的性质有关:默认情况下,它不会独立发送每条消息,而是将它们存储以便在每次代理调用时remainder,优化系统性能。

批处理是提高效率的重要驱动因素之一,并且能够实现 批处理 Kafka 生产者将尝试在内存中积累数据 并在单个请求中发送更大的批次

如果生产者停止/启动,这种不平衡可能会更加臭名昭著,因为无论先前选择的分区如何,它都会重新启动机制(因此它可以开始发送到在停止之前选择的同一分区,因此增加了与上次执行的其他非选举分区的差异).

在新的执行中,缓冲区将全部为空,因此无论哪个分区接收最多,进程都会重新启动。

因此,您在此处停止该过程:

partitioner

保存每个主题的消息数量计数器的映射重新启动,因为它不是代理的一部分,来自生产者的 send multiple messages 类。如果生产者没有正确关闭和/或刷新,那些缓存的消息也将丢失。所以,在这种情况下,你得到的是之前逻辑的重复:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

这会在某些时候导致这种情况:

    MsgN % Partitions   Partition
        0%4                0
        1%4                1
        2%4                2
        3%4                3
                 (...)

这是由发送过程的非连续执行产生的不平衡,但它超出了 Partition Offset 0 2000 1 2000 2 0 3 0 的界限,其性质基于连续过程(不间断)。

您可以通过在发送消息时检查每个分区的偏移量来验证此行为:只有当所选分区存储 n 消息时,下一个选举分区将获得其批量 n 消息。

注意:示例中显示的数字引用了“完美”场景;在现实生活中,无论缓冲区大小如何,消息也可能被撤销、压缩、失败、刷新,分区不可用,......导致偏移量,如您的问题所示。

最后一个冲洗场景示例:

RoundRobinPartitioner

进程已停止,但生产者已正确关闭并刷新其消息,因此主题如下所示:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

进程重新启动。刷新第一个分区的缓冲区后,看起来像:

    Partition    Offset
       0           1997
       1           1996
       2           999
       3           998

因此增加了对该机制的“公平性”的混淆。但这不是它的错,因为在分区器的映射、计数器和缓冲区中没有持久性。如果您让流程连续执行数天,您会发现它确实以“near-equal”的方式平衡了消息。


but of the Partitioner相关方法

    Partition    Offset
       0           2997
       1           1996
       2           999
       3           998

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-