Kafka源码解析_kafka删除消费组命令

本文依然是以kafka0.8.2.2为例讲解

一,如何删除一个topic

删除一个topic有两个关键点:

1,配置删除参数

delete.topic.enable这个Broker参数配置为True。

2,执行

bin/kafka-topics.sh –zookeeper zk_host:port/chroot –delete –topic my_topic_name

假如不配置删除参数为true的话,topic其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除topic在Zookeeper的信息和日志,其实这个操作并不会清除kafkaBroker内存的topic数据。所以,此时最佳的策略是配置删除参数为true然后,重启kafka。

二,重要的类介绍

1,PartitionStateMachine

该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态

NonExistentPartition

NewPartition

OnlinePartition

OfflinePartition

2,ReplicaManager

负责管理当前机器的所有副本,处理读写、删除等具体动作。

读写:写获取partition对象,再获取Replica对象,再获取Log对象,采用其管理的Segment对象将数据写入、读出。

3,ReplicaStateMachine

副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种

NewReplica:Crontroller在分区重分配的时候可以创建一个新的副本。只能接受变为follower的请求。前状态可以是NonExistentReplica

OnlineReplica:新启动的分区,能接受变为leader或者follower请求。前状态可以是NewReplica, OnlineReplica or OfflineReplica

OfflineReplica:死亡的副本处于这种状态。前状态可以是NewReplica, OnlineReplica

ReplicaDeletionStarted:分本删除开始的时候处于这种状态,前状态是OfflineReplica

ReplicaDeletionSuccessful:副本删除成功。前状态是ReplicaDeletionStarted

ReplicaDeletionIneligible:删除失败的时候处于这种状态。前状态是ReplicaDeletionStarted

NonExistentReplica:副本成功删除之后处于这种状态,前状态是ReplicaDeletionSuccessful

4,TopicDeletionManager

该类管理着topic删除的状态机

1),TopicCommand通过创建/admin/delete_topics/,来发布topic删除命令。

2),Controller监听/admin/delete_topic子节点变动,开始分别删除topic。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!

3),Controller有个后台线程负责删除Topic

三,源码彻底解析topic的删除过程

此处会分四个部分:

A),客户端执行删除命令作用

B),不配置delete.topic.enable整个流水的源码

C),配置了delete.topic.enable整个流水的源码

D),手动删除zk上topic信息和磁盘数据

1,客户端执行删除命令

bin/kafka-topics.sh –zookeeper zk_host:port/chroot –delete –topic my_topic_name

进入kafka-topics.sh我们会看到

exec (dirname 0)/kafka-run-class.sh kafka.admin.TopicCommand

进入TopicCommand里面,main方法里面

else if(opts.options.has(opts.deleteOpt)) deleteTopic(zkClient, opts)

实际内容是

val topics = getTopics(zkClient, opts) if (topics.length == 0) { println(“Topic %s does not exist”.format(opts.options.valueOf(opts.topicOpt))) } topics.foreach { topic => try { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) 在”/admin/delete_topics”目录下创建了一个topicName的节点。

2,假如不配置delete.topic.enable整个流水是

总共有两处listener会响应:

A),TopicChangeListener

B),DeleteTopicsListener

使用topic的删除命令删除一个topic的话,指挥触发DeleteTopicListener。

var topicsToBeDeleted = { import JavaConversions._ (children: Buffer[String]).toSet } val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t)) topicsToBeDeleted –= nonExistentTopics if(topicsToBeDeleted.size > 0) { info(“Starting topic deletion for topics ” + topicsToBeDeleted.mkString(”,”)) // mark topic ineligible for deletion if other state changes are in progress topicsToBeDeleted.foreach { topic => val preferredReplicaElectionInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(.topic).contains(topic) val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.keySet.map(.topic).contains(topic) if(preferredReplicaElectionInProgress || partitionReassignmentInProgress) controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) } // add topic to deletion list controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted) } 由于都会判断delete.topic.enable是否为true,假如不为true就不会执行,为true就进入执行

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))

controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)

3,delete.topic.enable配置为true

此处与步骤2的区别,就是那两个处理函数。

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted) markTopicIneligibleForDeletion函数的处理为 if(isDeleteTopicEnabled) { val newTopicsToHaltDeletion = topicsToBeDeleted & topics topicsIneligibleForDeletion ++= newTopicsToHaltDeletion if(newTopicsToHaltDeletion.size > 0) info(“Halted deletion of topics %s”.format(newTopicsToHaltDeletion.mkString(“,”))) } 主要是停止删除topic,假如存储以下三种情况

  • Halt delete topic if –
    1. replicas being down
    1. partition reassignment in progress for some partitions of the topic
    1. preferred replica election in progress for some partitions of the topic

enqueueTopicsForDeletion主要作用是更新删除topic的集合,并激活TopicDeleteThread

def enqueueTopicsForDeletion(topics: Set[String]) { if(isDeleteTopicEnabled) { topicsToBeDeleted ++= topics partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic) resumeTopicDeletionThread() } } 在删除线程DeleteTopicsThread的doWork方法中

topicsQueuedForDeletion.foreach { topic => // if all replicas are marked as deleted successfully, then topic deletion is done if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) { // clear up all state for this topic from controller cache and zookeeper completeDeleteTopic(topic) info(“Deletion of topic %s successfully completed”.format(topic)) } 进入completeDeleteTopic方法中

// deregister partition change listener on the deleted topic. This is to prevent the partition change listener // firing before the new topic listener when a deleted topic gets auto created partitionStateMachine.deregisterPartitionChangeListener(topic) val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionSuccessful) // controller will remove this replica from the state machine as well as its partition assignment cache replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica) val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic) // move respective partition to OfflinePartition and NonExistentPartition state partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition) partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition) topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic)) controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic)) controllerContext.removeTopic(topic) 主要作用是解除掉监控分区变动的listener,删除Zookeeper具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。

其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。

首次清除的话,在删除线程DeleteTopicsThread的doWork方法中

{ // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in // TopicDeletionSuccessful. That means, that either given topic haven’t initiated deletion // or there is at least one failed replica (which means topic deletion should be retried). if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) { // mark topic for deletion retry markTopicForDeletionRetry(topic) }

进入markTopicForDeletionRetry val failedReplicas = controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionIneligible) info(“Retrying delete topic for topic %s since replicas %s were not successfully deleted” .format(topic, failedReplicas.mkString(“,”))) controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica) 在ReplicaStateMachine的handleStateChanges方法中,调用了handleStateChange,处理OfflineReplica // send stop replica command to the replica so that it stops fetching from the leader brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition,deletePartition = false) 接着在handleStateChanges中 brokerRequestBatch.sendRequestsToBrokers(controller.epoch,controllerContext.correlationId.getAndIncrement) 给副本数据存储节点发送StopReplicaKey副本指令,并开始删除数据 stopReplicaRequestMap foreach { case(broker, replicaInfoList) => val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet debug(“The stop replica request (delete = true) sent to broker %d is %s” .format(broker, stopReplicaWithDelete.mkString(“,”))) debug(“The stop replica request (delete = false) sent to broker %d is %s” .format(broker, stopReplicaWithoutDelete.mkString(“,”))) replicaInfoList.foreach { r => val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch,correlationId) controller.sendRequest(broker, stopReplicaRequest, r.callback) } } stopReplicaRequestMap.clear() Broker的KafkaApis的Handle方法在接受到指令后 case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)

val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) 接着是在stopReplicas方法中 { controllerEpoch = stopReplicaRequest.controllerEpoch // First stop fetchers for all partitions, then stop the corresponding replicas replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r =>TopicAndPartition(r.topic, r.partition))) for(topicAndPartition <- stopReplicaRequest.partitions){ val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition,stopReplicaRequest.deletePartitions) responseMap.put(topicAndPartition, errorCode) } (responseMap, ErrorMapping.NoError) } 进一步进入stopReplica方法,正式进入日志删除 getPartition(topic, partitionId) match { case Some(partition) => if(deletePartition) { val removedPartition = allPartitions.remove((topic, partitionId)) if (removedPartition != null) removedPartition.delete() // this will delete the local log } 以上就是kafka的整个日志删除流水。

4,手动删除zk上topic信息和磁盘数据

TopicChangeListener会监听处理,但是处理很简单,只是更新了

val deletedTopics = controllerContext.allTopics – currentChildren controllerContext.allTopics = currentChildren

val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,newTopics.toSeq) controllerContext.partitionReplicaAssignment =controllerContext.partitionReplicaAssignment.filter(p => 四,总结

Kafka的topic的删除过程,实际上就是基于Zookeeper做了一个订阅发布系统。Zookeeper的客户端创建一个节点/admin/delete_topics/,由kafka Controller监听到事件之后正式触发topic的删除:解除Partition变更监听的listener,清除内存数据结构,删除副本数据,删除topic的相关Zookeeper节点。想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、MySQL,redis、jvm、多线程、netty、kafka、的加尉xin(同英):1253431195 扩列获取资料学习,无工作经验不要加哦!

delete.topic.enable配置该参数为false的情况下执行了topic的删除命令,实际上未做任何动作。我们此时要彻底删除topic建议修改该参数为true,重启kafka,这样topic信息会被彻底删除,已经测试。

一般流行的做法是手动删除Zookeeper的topic相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181295.html原文链接:https://javaforall.cn

原文地址:https://cloud.tencent.com/developer/article/2150518

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340