为什么终止进程并再次运行后,Node中的Kafka Streams向我发送重复的有效载荷?

如何解决为什么终止进程并再次运行后,Node中的Kafka Streams向我发送重复的有效载荷?

我对整个微服务体系结构还很陌生,所以我一直在使用Kafka来检索数据以及向微服务发送数据。我知道如何通过常规的消费者和生产者很好地发送和检索数据(尽管我不是专家),但是最近我了解了Kafka Streams,并希望使用它来简化我正在使用的数据。我能够从另一个微服务中收集数据,但是我注意到,如果我终止该过程并再次运行它,我将获得数据,并在其下再添加一个相同数据的副本。而且,如果我要终止该过程并再次运行它,那么我将在最后一个重复项的下面有另一个重复的数据!即使我停止运行提供数据的其他微服务,我也能够收集数据,因此我假设数据已保存在某处。如果我终止一次并再次运行它,它将是这样的样子。

TOPIC: requestAllUserData
[kafka-producer -> requestAllUserData]: broker update success
[
  {
    id: 1,first_name: 'John',last_name: 'Doe',city: 'Northridge',age: 25,gender: 'Male',profession: 'Teacher',email: 'johntho213@gmail.com',username: 'JohnTho213',created_at: '06-05-2019',deleted_at: '09-29-2020'
  },{
    id: 2,first_name: 'Mike',last_name: 'Brown',city: 'Topanga',age: 19,profession: 'Senator',email: 'mikebrown@gmail.com',username: 'MBrownYe',created_at: '07-04-18',deleted_at: null
  }
]
[
  {
    id: 1,deleted_at: null
  }
]

如您所见,我收到了两次有效载荷,而我只想看到一次。有谁知道这种意外行为的可能原因?我在这里关注了文档-> https://nodefluent.github.io/kafka-streams/docs/

我包含的代码与下面的文档中的代码几乎没有什么

"use strict";
const { KafkaStreams } = require("kafka-streams");
const { nativeConfig: config } = require("./KSConfig.js");
const kafkaStreams = new KafkaStreams(config);
const stream = kafkaStreams.getKStream();

stream
  .from("AllUserDataResponse")
  .forEach(message => console.log(JSON.parse(message.value)));

function streamTest(){
  stream.start().then(() => {
      console.log("stream started,as kafka consumer is ready.");
  },error => {
      console.log("streamed failed to start: " + error);
  });
}

exports.streamTest = streamTest;

我正在Server.js文件中运行此文件,尽管我认为这些信息并没有真正的帮助。另外,我一直在尝试收集数据并将其存储在列表或数组中,但是这样做还没有任何运气,因此,如果有人能帮助我做到这一点,也将不胜感激。哦,这是我的KSConfig文件,如果有帮助的话。

"use strict";


const batchOptions = {
    batchSize: 5,commitEveryNBatch: 1,concurrency: 1,commitSync: false,noBatchCommits: false
};

const nativeConfig = {
    noptions: {
        "metadata.broker.list": "localhost:9092",//native client requires broker hosts to connect to
        "group.id": "kafka-streams-test-native","client.id": "kafka-streams-test-name-native","event_cb": true,"compression.codec": "snappy","api.version.request": true,"socket.keepalive.enable": true,"socket.blocking.max.ms": 100,"enable.auto.commit": false,"auto.commit.interval.ms": 100,"heartbeat.interval.ms": 250,"retry.backoff.ms": 250,"fetch.min.bytes": 100,"fetch.message.max.bytes": 2 * 1024 * 1024,"queued.min.messages": 100,"fetch.error.backoff.ms": 100,"queued.max.messages.kbytes": 50,"fetch.wait.max.ms": 1000,"queue.buffering.max.ms": 1000,"batch.num.messages": 10000
    },tconf: {
        "auto.offset.reset": "earliest","request.required.acks": 1
    },batchOptions
};

module.exports = {
    nativeConfig
};

如果您有任何其他问题,我会再答复。任何帮助或建议,将不胜感激。谢谢!

解决方法

首先,您需要知道Kafka不能保证一次交货(我们确实有幂等的生产者和交易,但是为了简单起见,在此讨论中我会假装它们不存在 em>)。可能有很多情况会导致重复。

在您的用例中,说消费者读取了一个消息批(m1-m5)。它已处理(并作为日志的一部分打印),但是在将其提交给代理之前,该过程已终止。在这种情况下,再次启动该进程时,它将重新读取同一消息(m1-m5),因为它从未提交给代理。

话虽如此,复制品也可以来自生产商。假设kafka生产者发送了一条消息,而代理收到了该消息,但是由于某些网络故障,错过了来自代理的确认。在这种情况下,生产者可以重新发送导致重复的消息。因此,总而言之,您应该期望在kafka管道中存在重复项,并因此使其具有幂等性。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-