Spring Batch Kafka Consumer Job:跨多个JVM进程的荣誉消息组

如何解决Spring Batch Kafka Consumer Job:跨多个JVM进程的荣誉消息组

我有一个简单的Spring Batch Kafka Consumer Job,它可以从Kafka主题中读取数据并将数据写入文件中。

我想到了生成 5个实例的我的Kafka消费者工作,以便该工作可以更快地完成。也就是说,我启动了5次程序,以便在自己的JVM进程中启动 5个消费者Jobs

此方法的直接问题是将有5个进程写入同一文件。我通过在文件名后附加一个唯一的进程ID来解决此问题。我更新的writer bean如下:

private static final String UNIQUE_PROCESS_IDENTIFIER = System.currentTimeMillis();    

@Bean
public FlatFileItemWriter<String> testFileWriter() {
    FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(
            "I:/CK/data/output_from_consumer_"+UNIQUE_PROCESS_IDENTIFIER+".dat"));
    writer.setAppendAllowed(false);
    writer.setShouldDeleteIfExists(true);
    DelimitedLineAggregator<String> lineAggregator = new DelimitedLineAggregator<>();
    lineAggregator.setDelimiter(",");
    writer.setLineAggregator(lineAggregator);
    return writer;
}

通过将时间戳附加到输出文件名,可以确保每个Consumer JVM进程都写入其自己的文件。

当我最终启动同一程序的5个实例(JVM进程)时,我的期望是,如果在其自己的JVM进程中运行的一个使用者作业从分区中读取一条消息,则在他们自己的JVM进程中运行的其他使用者作业将不会从同一分区再次读取同一条消息(因为所有5个Java进程将使用相同的使用者组,即 mygroup

但是,我可以看到每个使用者作业进程(JVM)最终都读取了所有消息。结果,我现在有5个文件,每个文件包含相同的内容。示例输出文件名以及每个文件中的记录数,以进行更好的说明:

output_from_consumer_1600530320385.dat -> 1 million records
output_from_consumer_1600530335555.dat -> 1 million reocrds
output_from_consumer_1900530335555.dat -> 1 million records
output_from_consumer_1900530335556.dat -> 1 million records
output_from_consumer_1900730334556.dat -> 1 million records

Total records: 5 million

问题:如何配置Spring Batch作业,以使即使使用该使用者作业启动了多个Java进程,该Java进程也只能读取尚未由同一组中的使用者读取的数据。是作为单独的Java进程启动的?

这是我的预期输出(仅代表):

output_from_consumer_1600530320385.dat -> 100,000 records
output_from_consumer_1600530335555.dat -> 200,000 records
output_from_consumer_1900530335555.dat -> 200,000 records
output_from_consumer_1900530335556.dat -> 400,000 records
output_from_consumer_1900730334556.dat -> 100,000 records 

Total records : 1 million

解决方法

在同一组ID中运行具有相同消费者ID的多个Kafka消费者实例并不能帮助您实现并行性。

可以通过使用多个具有不同消费者ID和相同消费者组ID的消费者来实现Kafka消费者中的并行化。消费者组是一个组中多个消费者的分组机制。数据在组的所有使用者之间平均分配,组中没有两个使用者接收相同的数据。

在将分区分配给使用者之前,Kafka首先会检查是否存在具有给定组ID的现有使用者。 当不存在具有给定组ID的现有使用者时,它将为该新使用者分配该主题的所有分区。 当已经有两个使用给定组ID的消费者并且第三个消费者想要使用相同的组ID进行消费时。它将在所有三个使用者之间平均分配分区。不会将两个具有相同group-id的使用者分配到同一分区。

示例 假设有一个包含4个分区和两个使用者的主题,consumer-Aconsumer-B希望通过组ID为my-consumer-group的使用者使用,那么Kafka将为每个使用者分配相等数量的分区consumer-A2 to the consumer-B的2。

在您的用例中,由于Kafka主题包含4个分区,因此您可以使用4个使用者,每个使用者具有不同的使用者ID和相同的组ID。

,

创建KafkaItemReader时,可以指定要从哪个分区读取:

KafkaItemReader reader = new KafkaItemReader(myConsumerProperties,"topic1",0)

上述阅读器将从0中的分区topic1中读取消息。因此,在您的情况下,您可以并行运行作业,并配置每个作业以读取来自不同主题的消息(例如,将主题/分区作为作业参数传递)。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-