Kafka Streaming任务和内部状态存储的管理

如何解决Kafka Streaming任务和内部状态存储的管理

让我们说我们已经在具有以下属性的2台不同机器(实例)上启动了2个Streaming-Tasks:-

public final static String applicationID = "StreamsPOC";
public final static String bootstrapServers = "10.21.22.56:9093";    
public final static String topicname = "TestTransaction";
public final static String shipmentTopicName = "TestShipment";
public final static String RECORD_COUNT_STORE_NAME = "ProcessorONEStore";

并使用上述这些属性,以下是流任务的定义:-

        Map<String,String> changelogConfig = new HashMap();
        changelogConfig.put("min.insyc.replicas","1");
        // Below line not working.
        changelogConfig.put("topic","myChangedTopicLog");
       
        StoreBuilder kvStoreBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(AppConfigs.RECORD_COUNT_STORE_NAME),AppSerdes.String(),AppSerdes.Integer()
        ).withLoggingEnabled(changelogConfig);

        kStreamBuilder.addStateStore(kvStoreBuilder);


        KStream<String,String> sourceKafkaStream = kStreamBuilder.stream
                (AppConfigs.topicname,Consumed.with(AppSerdes.String(),AppSerdes.String()));

现在,正如我观察到的那样,卡夫卡在幕后创建了一个主题(为了备份内部状态存储),其名称如下:-StreamsPOC-ProcessorONEStore-changelog

第一个问题是:-是否两个不同的流任务都维护内部状态存储并将其备份到同一主题?

第二个问题是-假设Task-1在分区1上启动,并且在其本地内部状态存储中写入“ ”,然后Task-2开始在Partition-2上工作并说还将写入其各自的本地状态存储中,那么这是否不会引发数据被覆盖的风险,因为这两个任务都将数据备份到相同的changelog主题?

第三个问题是:-如何将自定义名称指定为Change-log-topic?

应高度赞赏响应!

解决方法

首先,对术语有一些思考:“任务”一词在Kafka Stream中具有明确定义的含义,作为用户,您不能自己创建任务。执行程序时,Kafka Streams将创建“任务”,它们是“独立于计算单位”,并为您执行这些任务。 -我猜,“任务”的意思实际上是一个KafkaStreams客户端(称为 instance )。

如果您使用相同的application.id启动多个实例,它们将组成一个使用者组,并且它们将以并行数据的方式共享负载。对于状态存储,每个实例都将托管该存储的 shard (有时也称为分区)。所有实例都使用相同的主题,并且该主题为每个存储分片都有一个分区。从存储分片到更改日志分区有1:1映射。此外,从输入主题分区到任务有1:1映射,在任务和存储碎片之间有1:1映射。因此,总体而言,这是1:1:1:1映射:对于每个输入主题分区,将创建一个任务,并且每个任务都保存一个状态存储区的碎片,每个存储区碎片由changelog主题的一个分区支持。 (最重要的是,输入主题分区的数量决定了您获得多少个并行任务和存储分片,并且changelog主题的创建数量与输入主题的分区数量相同。)

  1. 是的,所有实例都使用相同的changelog主题。
  2. 由于任务是通过分片和changelog主题分区隔离的,因此它们不会相互覆盖。但是,任务的思想是每个任务处理不同的(不重叠)键空间,因此,具有相同<k1,...> 的所有记录应由相同的任务处理。当然,该规则可能会有例外,如果您的应用程序不使用非重叠键空间,则程序将仅被执行(当然,根据您的业务逻辑要求,这可能是正确的还是不正确的。)
  3. 似乎您已经做过:请注意,您只能自定义变更日志主题名称的一部分:<application.id>-<storeName>-changelog-即,您可以选择application.idstoreName。不过,整体命名模式是硬编码的。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-