Kafka入门及可视化界面推荐

Kafka

官方文档:

https://kafka.apache.org/documentation/

Kafka 中文文档 - ApacheCN

快速开始

 vim config/server.properties

log.dirs=/usr/local/kafka/logs

关闭 kafka

1、一定要先关闭 kafka,再关闭zookeeper,否则容易出现数据错乱

如果出现数据错错乱,最简单的方法就是清空 data 和 kafka-logs 这两个文件下的内容,重新启动即可

2、关闭

.\bin\windows\kafka-server-stop.bat
.\bin\windows\zookeeper-server-stop.bat

参考文章:

Kafka的下载安装以及使用 - 技术栈 (jishuzhan.net)

整合 SpringBoot

参考文章

kafka安装说明以及在项目中使用

SpringBoot整合Kafka

项目启动后,运行

localhost:8080/send?msg=testSpringBootKafka

localhost:8080/test/mock

Telnet 指令

Telnet 是一种用于远程登录和管理网络设备的协议,同时也是基于这个协议的命令行工具。下面是一些常用的 Telnet 命令:

  1. 连接到远程主机:

    telnet <host> <port>
    
  2. <host> 替换为要连接的远程主机的 IP 地址或主机名,将 <port> 替换为要连接的端口号。例如,telnet example.com 23 将连接到 example.com 的 23 端口。

  3. 发送命令或数据:

    在 Telnet 连接建立后,你可以直接在命令行中输入命令或数据,并按 Enter 键发送给远程主机。例如,输入 ls 命令查看远程主机上的文件列表。

  4. 退出 Telnet 连接:

    在 Telnet 连接中,你可以使用以下命令之一来退出:

    • 输入 quit 或 exit 命令并按 Enter 键。
    • 按下 Ctrl + ],然后输入 quit 命令并按 Enter 键。

注意,Telnet 是一种明文协议,数据在传输过程中不会被加密,因此不建议在不安全的网络环境中使用 Telnet。对于安全连接,建议使用 SSH(Secure Shell)协议来进行远程登录和管理。

可视化界面

v1.0.8 版本:gitee来源kafka-console-ui.zip

快速启动

Windows
  1. 解压缩 zip 安装包
  2. 进入 bin 目录(必须在 bin 目录下),双击执行 start.bat 启动
  3. 停止:直接关闭启动的命令行窗口即可
Linux 或 Mac OS
# 解压缩
unzip kafka-console-ui.zip
# 进入解压缩后的目录
cd kafka-console-ui
# 启动
sh bin/start.sh
# 停止
sh bin/shutdown.sh
访问地址

启动完成,访问:http://127.0.0.1:7766

Kafka 如何保证消息不重复消费?

kafka 出现消息重复消费的原因:

  • 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
  • Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。

解决方案:

  • 消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。

  • enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。

    那么这里会有个问题:什么时候提交 offset 合适?

    • 处理完消息再提交:依旧有消息重复消费的风险(例如提交前服务挂掉了),和自动提交一样。
    • 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。

Kafka 的自动提交是怎么样的

Kafka 的自动提交(Automatic Offset Commit)是一种机制,用于确定 Kafka 消费者应该将其当前的消费位移(offset)提交到 Kafka 服务器。

消费位移是一个指示 Kafka 主题分区中消费者已读取到的位置的值。

自动提交可以帮助简化消费者的管理,但它也涉及一些注意事项和权衡。

以下是关于 Kafka 自动提交的一些重要信息

  1. 默认行为:Kafka 消费者默认启用了自动提交。这意味着消费者会自动定期将当前的消费位移提交到 Kafka 服务器,而无需显式调用 commitSynccommitAsync 方法。

  2. 提交频率:自动提交的频率由消费者配置参数 auto.commit.interval.ms 决定,默认值为 5000 毫秒(5 秒)。这意味着每隔 5 秒,消费者将提交当前的位移。

  3. 幂等性问题:自动提交可能导致幂等性问题。如果消息在处理过程中成功处理但在位移提交之前失败,那么消息可能会被重新处理,这可能导致副作用。为了解决这个问题,你可以采用幂等性的消息处理逻辑,或者使用手动位移提交来更精确地控制位移的提交时机。

  4. 配置禁用:如果你希望完全控制位移的提交,可以禁用自动提交。通过将 enable.auto.commit 配置为 false,你可以关闭自动提交功能,然后在适当的时候手动调用 commitSynccommitAsync 来提交位移。

    enable.auto.commit=false
    
  5. 手动提交:手动提交允许你在消息处理成功后显式地提交位移。这可以确保只有在消息成功处理后才提交位移,从而避免重复处理消息的问题。

    consumer.poll(Duration.ofMillis(100)); // 拉取消息
    // 处理消息
    consumer.commitSync(); // 手动提交位移
    

总之,自动提交是 Kafka 消费者的默认行为,但在一些情况下,特别是需要确保幂等性消息处理成功后才提交位移的情况下,可能需要禁用自动提交并使用手动提交。自动提交的频率可以通过配置进行调整。

讲讲 kafka 的自动提交,是在处理完消息前提交还是处理完消息之后提交

Apache Kafka 的消费者有两种方式来提交消费的进度,也就是提交 offset,一种是自动提交,另一种是手动提交。

这里我们主要讨论自动提交。

在 Kafka 中,自动提交是指:消费者自动地定期提交已经拉取的消息的 offset。

如果启用了自动提交,那么 auto.commit.interval.ms 配置的时间到了,消费者就会提交最新的 offset,无论消息是否已经处理完毕。

这意味着,自动提交的 offset 是在拉取到消息后就可能提交,而不一定是在处理完消息之后提交。这将导致在消费者崩溃或者重新启动时,可能会出现消息重复处理或者消息丢失的情况。

  • 例如,如果消费者已经拉取了一些消息,但还没来得及处理,这时候自动提交触发,offset 被提交。如果此时消费者崩溃,再次启动时,它会从提交的 offset 开始消费,导致之前拉取但未处理的消息丢失。

  • 另一方面,如果消费者拉取了一些消息并处理了它们,但在自动提交提交触发之前消费者崩溃了,那么 offset 并没有被提交。当消费者再次启动时,它会从上次提交的 offset 开始消费,这将导致处理过的消息被重复处理。

因此,

  • 如果你的应用对消息的处理具有幂等性(即处理多次和处理一次的效果一样),那么使用自动提交可能是一个简单且有效的选择。
  • 但如果你的应用需要精确地处理每一条消息,即不能丢失消息也不能重复处理消息,那么你可能需要考虑使用手动提交 offset,这样可以更精确地控制何时提交 offset。

自动提交与手动提交

  1. 自动提交(Auto Commit):这是指 Kafka 消费者定期自动将当前已消费的消息位移(offset)提交到 Kafka 服务器,而无需显式的用户干预。自动提交的频率由配置参数控制,通常情况下是定期的。这种提交位移的方式可能会导致一些消息被处理但尚未确认已成功处理的情况。如果发生故障,这些消息可能会被重新处理,从而可能导致消息的重复消费。
  2. 位移/手动 提交(Offset Commit):这是指消费者在成功处理一条消息后,显式地将该消息的位移提交到 Kafka 服务器。位移提交是由用户代码控制的,通常在处理逻辑执行后,确认消息已被成功处理后执行。这种提交位移的方式确保了消息只有在成功处理后才会被视为已消费,从而避免了消息的重复处理。

一般来说,位移提交更可靠,因为它允许消费者完全控制位移何时提交,以确保消息被成功处理后才被标记为已消费。自动提交则更容易实现,但在某些情况下可能导致消息的重复消费,因此需要根据具体的需求和应用场景来选择使用哪种方式。

所以,自动提交是指消费者定期自动提交位移,而位移提交是指显式地提交位移,由用户代码控制。

Kafka 重试机制

消费失败会怎样?

在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。

因此,即使某个消息消费异常,Kafka 消费者仍然能够继续消费后续的消息,不会一直卡在当前消息,保证了业务的正常进行。

默认会重试多少次?

Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试

如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。

如何在重试失败后进行告警?

自定义重试失败后逻辑,需要手动实现,可以通过【重写 DefaultErrorHandlerhandleRemaining 函数,加上自定义的告警等操作来】实现。

@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {

    public DelErrorHandler(FixedBackOff backOff) {
        super(null,backOff);
    }

    @Override
    public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        super.handleRemaining(thrownException, records, consumer, container);
        log.info("重试多次失败");
        // 自定义操作
    }
}

DefaultErrorHandler 只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler 接口。手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。

重试失败后的数据如何再次处理?

当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?

当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。对于死信队列的处理,既可以用 @DltHandler 处理,也可以使用 @KafkaListener 重新消费。

重试注解 @RetryableTopic

@RetryableTopic 是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。

// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(
        attempts = "5",
        backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
    log.info("kafka customer:{}", message);
    Integer n = Integer.parseInt(message);
    if (n % 5 == 0) {
        throw new RuntimeException();
    }
    System.out.println(n);
}

什么是死信队列?

**死信队列(Dead Letter Queue,简称 DLQ)**是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。

  • 当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。
  • 在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。

Kafka 工作原理

生产

消息经过序列化后,通过不同的分区策略,找到对应的分区。

相同主题和分区的消息,会被存放在同一个批次里,然后由一个独立的线程负责把它们发到 Kafka Broker 上。

分区的策略包括顺序轮询、随机轮询和 key hash 这 3 种方式,那什么是分区呢?

分区是 Kafka 读写数据的最小粒度,比如主题 A 有 15 条消息,有 5 个分区,如果采用顺序轮询的方式,15 条消息会顺序分配给这 5 个分区,后续消费的时候,也是按照分区粒度消费。

由于分区可以部署在多个不同的机器上,所以可以通过分区实现 Kafka 的伸缩性,比如主题 A 的 5 个分区,分别部署在 5 台机器上,如果下线一台,分区就变为 4。

消费

  • Kafka 消费是通过消费群组完成,【同一个】消费者群组,一个消费者可以消费多个【分区】,但是一个【分区】,只能被一个消费者消费。
  • 如果消费者增加,会触发 Rebalance,也就是分区和消费者需要重新配对
  • 不同的消费群组互不干涉

学习参考

原文地址:https://blog.csdn.net/qq_54088234/article/details/135191907

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署