Kafka控制器事件处理全流程分析

前言

大家好,我是 yes。

这是Kafka源码分析第四篇文章,今天来说说 Kafka控制器,即 Kafka Controller

源码类的文章在手机上看其实效果很差,这篇文章我分为两部分,第一部分就是直接图文来说清整个 Kafka 控制器事件处理全流程,然后再通过Controller选举流程进行一波源码分析,再来走一遍处理全流程。

​一些在手机上看的同学可以直接看前半部分,没有一堆代码比较舒适,也能看明白整个流程,后面源码部分看个人了。

不过建议电脑端看效果更佳。

正文

在深入源码之前我们得先搞明白 Controller是什么?它有什么用?这样在看源码的时候才能有的放矢

Controller核心组件,它的作用是管理和协调整个Kafka集群

具体管理和协调什么呢?

  • 主题的管理,创建和删除主题;
  • 分区管理,增加或重分配分区;
  • 分区Leader选举;
  • 监听Broker相关变化,即Broker新增、关闭等;
  • 元数据管理,向其他Broker提供元数据服务;

为什么需要Controller​?

我个人理解:凡是管理或者协调某样东西,都需要有个Leader,由他来把控全局,管理内部,对接外部,咱们就跟着Leader干就完事了。这其实对外也是好的,外部不需要和我们整体沟通,他只要和一个决策者交流,效率更高。

再来看看朱大是怎么说的,以下内容来自《深入理解Kafka:核心设计与实践原理》。

在Kafka的早期版本中,并没有采用 Kafka Controller 这样一概念来对分区和副本的状态进行管理,而是依赖于 ZooKeeper,每个 broker都会在 ZooKeeper 上为分区和副本注册大量的监听器(Watcher)。
当分区或副本状态变化时,会唤醒很多不必要的监听器,这种严重依赖 ZooKeeper 的设计会有脑裂、羊群效应,以及造成 ZooKeeper 过载的隐患。在目前的新版本的设计中,只有 Kafka Controller 在 ZooKeeper 上注册相应的监听器,其他的 broker 极少需要再监听 ZooKeeper 中的数据变化,这样省去了很多不必要的麻烦。

简单说下ZooKeeper

了解了 Controller的作用之后我们还需要在简单的了解下ZooKeeper,因为Controller是极度依赖ZooKeeper的。(不过社区准备移除ZooKeeper,文末再提一下)

ZooKeeper是一个开源的分布式协调服务框架,最常用来作为注册中心等。ZooKeeper的数据模型就像文件系统一样,以根目录 "/" 开始,结构上的每个节点称为znode,可以存储一些信息。节点分为持久节点和临时节点,临时节点会随着会话结束而自动被删除。

并且有Watcher功能,节点自身数据变更、节点新增、节点删除、子节点数量变更都可以通过变更监听器通知客户端。

图来自《ZooKeeper》

Controller是如何依赖ZooKeeper的

每个Broker在启动时会尝试向ZooKeeper注册/controller节点来竞选控制器,第一个创建/controller节点的Broker会被指定为控制器。这就是是控制器的选举

/controller节点是个临时节点,其他Broker会监听着此节点,当/controller节点所在的Broker宕机之后,会话就结束了,此节点就被移除。其他Broker伺机而动,都来争当控制器,还是第一个创建/controller节点的Broker被指定为控制器。这就是控制器故障转移,即Failover

当然还包括各种节点的监听,例如主题的增减等,都通过Watcher功能,来实现相关的监听,进行对应的处理。

Controller在初始化的时候会从ZooKeeper拉取集群元数据信息,保存在自己的缓存中,然后通过向集群其他Broker发送请求的方式将数据同步给对方。

Controller 底层事件模型

不管是监听WatcherZooKeeperWatcher线程,还是定时任务线程亦或是其他线程都需要访问或更新Controller从集群拉取的元数据。多线程 + 数据竞争 = 线程不安全。因此需要加锁来保证线程安全。

一开始Kafka就是用大量的锁来保证线程间的同步,各种加锁使得性能下降,并且多线程加锁的方式使得代码复杂度急剧上升,一不小心就会出各种问题,bug难修复。

因此在0.11版本之后将多线程并发访问改成了单线程事件队列模式将涉及到共享数据竞争相关方面的访问抽象成事件,将事件塞入阻塞队列中,然后单线程处理

也就是说其它线程还是在的,只是把涉及共享数据的操作封装成事件由专属线程处理。

先小结一下

到这我们已经清楚了Controller主要用来管理和协调集群,具体是通过ZooKeeper临时节点和Watcher机制来监控集群的变化(当然还有来自定时任务或其他线程的事件驱动),更新集群的元数据,并且通知集群中的其他Broker进行相关的操作(这部分下文会讲)。

而由于集群元数据会有并发修改问题,因此将操作抽象成事件,由阻塞队列和单线程处理来替换之前的多线程处理,降低代码的复杂度,提升代码的可维护性和性能。

接下来我们再讲讲Controller通知集群中的其他Broker的相关操作。

Controller的请求发送

ControllerZooKeeper那儿得到变更通知之后,需要告知集群中的Broker(包括它自身)做相应的处理。

Controller只会给集群的Broker发送三种请求:分别是 LeaderAndIsrRequestStopReplicaRequestUpdateMetadataRequest

LeaderAndIsrRequest

告知Broker主题相关分区 LeaderISR副本都在哪些 Broker上。

StopReplicaRequest

告知Broker停止相关副本操作,用于删除主题场景或分区副本迁移场景。

UpdateMetadataRequest

更新Broker上的元数据。

Controller事件处理线程会把事件封装成对应的请求,然后将请求写入对应的Broker的请求阻塞队列,然后RequestSendThread 不断从阻塞队列中获取待发送的请求。

先解释下controllerBrokerStateInfo,它就是个 POJO类,可以理解为集群每个broker对应一个controllerBrokerStateInfo.

然后再看下ControllerChannelManager,从名字可以看出它管理Controller和集群Broker之间的连接,并为每个Broker创建一个RequestSendThread 线程。

再小结一下

接着上个小结,事件处理线程将事件队列里面的事件处理之后再进行对应的请求封装,塞入需要通知的集群Broker对应的阻塞队列中,然后由每个Broker专属的requestSendThread发送请求至对应的Broker

总的步骤如下图:

现在应该已经清楚Controller大概是如何运作的,整体看起来还是生产者-消费者模型

接下来就进入源码环节。

Controller选举流程源码分析

事件处理的流程都是一样的,只是具体处理的事件逻辑不同,我们从Controller选举入手,来走一遍处理流程。

ControllerChangeHandler

选举会触发此handler,可以看到直接往ControllerEventManager的事件队列里塞。

这个QueueEventControllerEventManager,我们先来看看是啥。不过在此之前先了解下ControllerEventControllerEventProcessor

ControllerEvent:事件

ControllerEventProcessor : 事件处理接口

此接口的唯一实现类是 KafkaController

ControllerEventManager:事件处理器

此类主要用来管理事件处理线程和事件队列。

QueuedEvent:封装了ControllerEvent的类

主要是记录了下入队时间,并且提供了事件需要调用的方法。

ControllerEventThread:事件处理线程

整体而言还是很简单的,从队列拿事件,然后处理。

KafkaController#process

就是个switch,根据事件调用对应的processXXXX方法。

来关注下controller 重选事件

然后在onControllerFailover里面会调用sendUpdateMetadataRequest方法

中间省略调用,内容太多了,不是重点,到后来调用ControllerBrokerRequestBatch#sendRequest

最后还是调用了controllerChannelManager#sendRequest.

然后 RequestSendThread#doWork,不断从请求队列里拿请求,发送请求。

一个环节完成了!我们来看下整体流程图

最后我们来看下元数据到底有啥和KafkaController的一些字段。

ControllerContext:元数据

主要有运行中的Broker、所有主题信息、主题分区副本信息等。

KafkaController

基本上关键的字段都解释了,关于状态机那一块篇幅有限,之后再说。

最后

整体的流程就是将Controller相关操作都封装成一个个事件,然后将事件入队,由一个事件处理线程来处理,保证数据的安全(从这也可以看出,不是多线程就是好,有利有弊最终还是看场景)。

最后在通知集群中Broker的过程是每个Broker配备一个发送线程,因为发送是同步的,因此每个Broker线程隔离可以防止某个Broker阻塞而导致整体都阻塞的情况。

前面有说到Kafka Controller 强依赖 ZooKeeper。但是现在社区打算移除 ZooKeeper,因为ZooKeeper不适合频繁写,并且是CP的。而且用Kafka 还需要维护ZooKeeper集群,提升了系统的复杂度和运维难度,降低了系统的稳定性。

像位移信息,已经通过内部主题的方式保存,绕开了ZooKeeper

社区打算通过类 Raft 共识算法来选举Controller,并且把元数据存储在 Log 中的方式来做。

我是 yes,从一点点到亿点点,我们下篇见

往期推荐:

消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?

图解+代码|常见限流算法以及限流在单机分布式场景下的思考

面试官:说说Kafka处理请求的全流程

Kafka索引设计的亮点

Kafka日志段读写分析

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署