来自带有JSON spooldir插件的kafka-connect中的kafka主题中的空消息

如何解决来自带有JSON spooldir插件的kafka-connect中的kafka主题中的空消息

我有以下设置-Zookeeper,kafka,架构注册表和kafka-connect。它们全都在kubernetes中的不同Pod中运行。我想将一些jsons放在kafka-connect和spooldir的文件夹中,以将它们发送到特定的kafka主题。我想将它们用作我们应用程序的冷门。这是kafka-connect的kubernetes配置

kind: StatefulSet
metadata:
  name: kafka-connect
  labels:
    app.kubernetes.io/name: kafka-connect
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: kafka-connect
  serviceName: kafka-connect-headless
  podManagementPolicy: OrderedReady
  updateStrategy:
    type: OnDelete
  replicas: 1
  template:
    metadata:
      labels:
        helm.sh/chart: kafka-0.21.0
        app.kubernetes.io/name: kafka-connect
    spec:
      terminationGracePeriodSeconds: 10
      volumes:
        - hostPath:
            path: /dependency/
            type: Directory
          name: dependency-data
      initContainers:
        - name: wait-for-schema-registry
          image: alpine:3.12.0
          command: ['sh','-c',"until nslookup schema-registry-svc.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for schema registry; sleep 2; done"]
      containers:
        - name: kafka-connect
          image: "confluentinc/cp-kafka-connect:5.5.0"
          imagePullPolicy: "IfNotPresent"
          volumeMounts:
            - mountPath: /dependency/
              name: dependency-data
          ports:
            - containerPort: 8083
              name: kafka-connect
              protocol: TCP
          command:
            - sh
            - -exc
            - |
              mkdir -p /dependency/unprocessed && \
              mkdir -p /dependency/json/processed && mkdir -p /dependency/json/error && \
              confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.43 && \
              confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:5.5.0 && \
              exec /etc/confluent/docker/run && \ 
              sleep infinity
          env:
            - name: CONNECT_REST_ADVERTISED_HOST_NAME
              value: "kafka-connect"
            - name: KAFKA_HEAP_OPTS
              value: "-Xms2048M -Xmx2048M"
            - name: CONNECT_BOOTSTRAP_SERVERS
              value: "kafka:9092"
            - name: CONNECT_GROUP_ID
              value: "kafka-connect"
            - name: CONNECT_PLUGIN_PATH
              value: "/usr/share/java,/usr/share/confluent-hub-components/"
            - name: CONNECT_KEY_CONVERTER
              value: "org.apache.kafka.connect.storage.StringConverter"
            - name: CONNECT_VALUE_CONVERTER
              value: "io.confluent.connect.avro.AvroConverter"
            - name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
              value: "http://schema-registry-svc:8081"
            - name: CONNECT_INTERNAL_KEY_CONVERTER
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_INTERNAL_VALUE_CONVERTER
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_CONFIG_STORAGE_TOPIC
              value: "kafka-connect-config"
            - name: CONNECT_OFFSET_STORAGE_TOPIC
              value: "kafka-connect-offset"
            - name: CONNECT_STATUS_STORAGE_TOPIC
              value: "kafka-connect-status"
            - name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
              value: "1"
            - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
              value: "1"
            - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
              value: "1"
            - name: CONNECT_LOG4J_ROOT_LOGLEVEL
              value: "DEBUG"
            - name: CONNECT_CONNECT_PROTOCOL
              value: "compatible"
            - name: CONNECT_TOPIC_CREATION_ENABLE
              value: "true"

我发送的spooldir配置:

curl -i -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d '{
  "name": "kafka-connect-spooldir","config": {
    "connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector","tasks.max":"1","input.path":"/dependency/unprocessed/","input.file.pattern":"^.*\\.json$","error.path":"/dependency/json/error","finished.path":"/dependency/json/processed","auto.register.schemas": "false","schema.generation.enabled": "true","topic":"checklist","value.converter.enhanced.avro.schema.support":"true","topic.creation.enable": "true"
    }
}'

kafka-connect日志中也没有例外,它将文件从未处理的文件夹移动到已处理的文件夹,但是当我尝试使用kafka-console-consumer从主题中读取内容时,它说它读取了2条消息,但是没有消息完全显示(只有2个空行并挂起)。然后,我尝试在SR中注册清单结构。我再次尝试处理这些文件-同样重用,这一次它说kafka-console-consumer消耗了4条消息,但未显示任何内容。我尝试使用kafka-avro-console-consumer-这次消耗了0条消息。我什至注意到kafka connect为我的对象注册了一个非常“伪”的模式- {"subject":"checklist-value","version":5,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"com.github.jcustenborder.kafka.connect.model\",\"fields\":[{\"name\":\"Checklist\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"com.github.jcustenborder.kafka.connect.model.Value\"}"}我也检查了kafka和SR日志,但没有发现有什么干扰。 也许不是必需的,但是我也尝试使用"schema.generation.enabled": "false",然后在日志中有java.lang.NullPointerException:如果'schema.generation.enabled'= false,则必须设置'value.schema'。然后尝试使用kafka-connect-avro-converter将我现有的.avsc转换为kafka模式,但事实证明它们没有正确的方式来打印生成的模式,因此我想它的目的与我的工作并不相同

在配置kafka-connect以开始使用SR进行序列化并且不发送主题中的空消息时,我缺少什么?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-