Kafka——配置文件详解

1、server.properties

属性 默认值 描述
broker.id 0 每个broker都需要有一个标识符,使用broker.id来表示。它的默认值是0,也可以被设置成其他任意整数。这个值在整个Kafka集群里必须是唯一的。这个值可以任意选定,如果出于维护的需要,可以在服务器节点间交换使用这些D。建议把它们设置成与机器名具有相关性的整数,这样在进行维护时,将①号映射到机器名就没那么麻烦了。例如,如果机器名包含唯一性的数字(比如host1.example…com、host2.example.com),那么用这些数字来设置broker.id就再好不过了。
log.dirs /tmp/kafka-logs Kafka把所有消息都保存在磁盘上,存放这些日志片段的目录是通过log.dis指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么broker会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
port 9092 如果使用配置样本来启动Kafka,它会监听9092端口。修改port配置参数可以把它设置成其他任意可用的端口。要注意,如果使用1024以下的端口,需要使用oot权限启动Kafka,不过不建议这么做。
zookeeper.connect null 用于保存broker元数据的Zookeeper地址是通过zookeeper.connect来指定的。localhost:2181表示这个Zookeeper是运行在本地的2181端口上。该配置参数是用冒号分隔的一组hostname:port/path列表,每一部分的含义如下:
hostname是Zookeeper服务器的机器名或IP地址;
port是Zookeeper的客户端连接端口;
/path是可选的Zookeeper路径,作为Kafka集群的chroot环境。如果不指定,默认使用根路径。如果指定的chroot路径不存在,broker会在启动的时候创建它。
message.max.bytes 1000000 broker通过设置message.max.bytes参数来限制单个消息的大小,默认值是1000000,也就是1MB。如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到broker返回的错误信息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于message.max.bytes指定的值,消息的实际大小可以远大于这个值。这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响IO吞吐量。注意此参数要和consumer的maximum.message.size大小一致,否则会因为生产者生产的消息太大导致消费者无法消费
num.io.threads 8 服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量
queued.max.requests 500 IO线程可以处理的队列大小,若实际请求数超过此大小,若实际请求数超过此大小,网络线程将停止接收新的请求
socket.send.buffer.bytes 100 * 1024 socket的发送缓冲区大小
socket.receive.buffer.bytes 100 * 1024 socket的接收缓冲区大小
socket.request.max.bytes 100 * 1024 * 1024 服务器允许请求的最大值,用来防止内存溢出,其值应该小于Java Heap Size
num.partitions 1 默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区个数少于num.partitions指定的值,需要手动创建该主题。
log.segment.bytes 1024 * 1024 * 1024 Segment文件的大小,超过此值将会自动新建一个segment,此值可以被topic级别的参数覆盖。以上的设置作用在日志片段上,而不是作用在单个消息上。当消息到达broker时,它们被追加到分区的当前日志片段上。当日志片段大小达到Log.segment.bytes指定的上限(默认是1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率。如果主题的消息量不大,那么如何调整这个参数的大小就变得尤为重要。例如,如果一个主题每天只接收100MB的消息,而log.segment.bytes使用默认设置,那么需要10天时间才能填满一个日志片段。因为在日志片段被关闭之前消息是不会过期的,所以如果1ogretention.ms被设为604800000(也就是1周),那么日志片段最多需要17天才会过期。这是因为关闭日志片段需要10天的时间,而根据配置的过期时间,还需要再保留7天时间(要等到日志片段里的最后一个消息过期才能被删除)。
log.segment.ms 另一个可以控制日志片段关闭时间的参数是Log.segment.ms,它指定了多长时间之后日志片段会被关闭。就像log.retention.bytes和log.retention.ms这两个参数一样,log.segment.bytes和Log.retention.ms这两个参数之间也不存在互斥问题。日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。默认情况下,log.segment.ms没有设定值,所以只根据大小来关闭日志片段。
log.retention.{ms,minutes,hours} 7 days kafka segment log的保存周期,保存周期超过此时间日志就会被删除。此参数可以被topic级别参数覆盖。数据量大时,建议减小此值。
log.retention.bytes -1 每个partition的最大容量,若数据量超过此值,partition数据将会被删除。注意这个参数控制的是每个partition而不是topic。此参数可以被log级别参数覆盖。如果同时指定了log.retention.bytes和log.retention.ms(或者另一个时间参数),只要任意一个条件得到满足,消息就会被删除。例如,假设1og·retention.ms设置为86400000(也就是1天),log.retention.bytes设置为1000000000(也就是1GB),如果消息字节总数在不到一天的时间就超过了1GB,那么多出来的部分就会被删除。相反,如果消息字节总数小于1GB,那么一天之后这些消息也会被删除,尽管分区的数据总量小于1GB。
log.retention.check.interval.ms 5 minutes 删除策略的检查周期
auto.create.topics.enable true 自动创建topic参数,建议此值设置为false,严格控制topic管理,防止生产者错写topic。
default.replication.factor 1 默认副本数量
replica.lag.time.max.ms 10000 此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-sync replicas)中移除
replica.lag.max.messages 4000 如果replica节点落后leader节点此值大小的消息数量,leader节点就会将其从ISR中移除
replica.socket.timeout.ms 30 * 1000 replica向leader发送请求的超时时间
replica.socket.receive.buffer.bytes 64 * 1024 replica接收leader数据的缓冲区大小
replica.fetch.max.bytes 1024 * 1024 Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存
replica.fetch.wait.max.ms 500 replicas同leader之间通信的最大等待时间,失败了会重试
num.replica.fetchers 1 leader进行复制的线程数,增大这个数值会增加follower的IO
fetch.purgatory.purge.interval.requests 1000 获取请求清除的清除间隔(以请求数为单位)
zookeeper.session.timeout.ms 6000 ZooKeeper session超时时间。如果在此时间内server没有向zookeeper发送心跳,zookeeper就会认为此节点已挂掉。此值太低导致节点容易被标记死亡;若太高,.会导致太迟发现节点死亡。
zookeeper.connection.timeout.ms 6000 客户端连接zookeeper的超时时间
zookeeper.sync.time.ms 2000 zookeeper的follower同leader的同步时间
controlled.shutdown.enable true 允许broker shutdown。如果启用,broker在关闭自己之前会把它上面的所有leaders:转移到其它brokers上,建议启用,增加集群稳定性。
auto.leader.rebalance.enable true 是否自动平衡broker之间的分配策略
leader.imbalance.per.broker.percentage 10 leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.check.interval.seconds 300 检查leader是否不平衡的时间间隔
offset.metadata.max.bytes 4096 客户端保留offset信息的最大空间大小
connections.max.idle.ms 600000 这个参数用来指定在多久之后关闭闲置的连接
num.recovery.threads.per.data.dir 1 默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是log.dirs指定的单个日志目录。也就是说,如果num.recovery.threads.per.data.dir被设为8,并且log.dir指定了3个路径,那么总共需要24个线程。
unclean.leader.election.enable true 如果unclean.leader.election.enable参数的值为false,那么就意味着非ISR中的副本不能够参与选举.如果unclean.leader.election.enable参数的值为true,那么可以从非ISR集合中选举follower副本成为新的leader。
delete.topic.enable false 启用deletetopic参数,建议设置为true。
offsets.topic.num.partitions 50 自动创建的位移主题分区数
offsets.topic.retention.minutes 1440 超过此期限的偏移将被标记为删除。当日志清理程序压缩偏移主题时,将进行实际清除
offsets.retention.check.interval.ms 600000 偏移管理器检查过时偏移的频率。
offsets.topic.replication.factor 3 偏移量提交主题的复制因子。表示kafka的内部topic consumer_offsets副本数。当该副本所在的broker宕机,consumer_offsets只有一份副本,该分区宕机。使用该分区存储消费分组offset位置的消费者均会收到影响,offset无法提交,从而导致生产者可以发送消息但消费者不可用。所以需要设置该字段的值大于1。
offsets.topic.segment.bytes 104857600 偏移量主题的段大小。
offsets.load.buffer.size 5242880 每次从offset段文件往缓存加载offsets数据时的读取的数据大小。默认5M
offsets.commit.required.acks -1 可以接受consumer提交之前所需的ack。通常,不应覆盖默认值(-1)
offsets.commit.timeout.ms 5000 consumer 提交的延迟时间,offsets提交将被延迟,直到偏移量topic的所有副本都收到提交或达到此超时。 这与producer请求超时类似。

2、producer.properties

属性 默认值 描述
metadata.broker.list 启动时producer2查询orokers的列表,可以是集群中所有brokersl的一个子集。注意,这个参数只是用来获取topic的元信息用,producer会从元信息中挑选合适的broker并与之建立socket;连接。格式是:host1:port1,host2:port2。
request.required.acks 0 #设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0:producer不会等待broker发送ack
1:当leader接收到消息后发送ack
-1:当所有的follower都同步消息成功后发送ack
request.timeout.ms 10000 Broker等待ack的超时时间,若等待时间超过此值,会返回客户端错误信息。
producer.type sync 同步异步模式。asynca表示异步,sync表示同步。如果设置成异步模式,可以允许生产者以patch的形式push数据,这样会极大的提高broker性能,推荐设置为异步。
serializer.class kafka.serializer.DefaultEncoder 指定序列化处理类
key.serializer.class key的序列化类
partitioner.class kafka.producer.DefaultPartitioner 指定分区处理类
compression.codec none 指定producer消息的压缩格式,可选参数为:none、gzip、snappy
compressed.topics null 启用压缩的topic:名称。若上面参数选择了一个压缩格式,那么压缩仅对本参数指定的topic有效,若本参数为空,则对所有topic有效。
message.send.max.retries 3 Producer发送失败时重试次数。若网络出现问题,可能会导致不断重试。
retry.backoff.ms 100 每次重试的间隔时间
topic.metadata.refresh.interval.ms 600 * 1000 每个使用者和生产者实例将以topic.metadata.refresh.interval.ms参数定义的间隔获取主题元数据。
queue.buffering.max.ms 5000 在async模式下,当message缓存超时后,将会批量发送给broker,默认5000ms
queue.buffering.max.messages 10000 在async模式下,Producer端允许buffer的最大消息量,如果超过这个数值,producer就会阻塞或者丢掉消息
queue.enqueue.timeout.ms -1 当消息在producer端沉积的条数达到“queue.buffering.max.messages"后 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息),此时producer可以继续阻塞,或者将消息抛弃
-1:无阻塞超时限制,消息不会被抛弃
0 :立即清空队列,消息被抛弃
batch.num.messages 200 采用异步模式时,一个batch:缓存的消息数量。达到这个数量值时producer才会发送消息。
send.buffer.bytes 100 * 1024 发送缓冲大小
client.id 设置单个Kafka生产者或消费者客户的名称

3、consumer.propertis

属性 默认值 描述
group.id Consumer的组ID,相同goup.id的consumer)属于同一个组。
zookeeper.connect 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3
consumer.id 消费者的ID,若是没有设置的话,会自增
socket.timeout.ms 30 * 1000 socket的超时时间,实际的超时时间是:max.fetch.wait + socket.timeout.ms.
zookeeper.session.timeout.ms 6000 zookeeper的心跳超时时间,超过这个时间就认为是dead消费者
zookeeper.connection.timeout.ms 6000 zookeeper的等待连接时间
zookeeper.sync.time.ms 2000 zookeeper的follower同leader的同步时间
auto.offset.reset largest 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常
socket.receive.buffer.bytes 64*1024 socket的接受缓存空间大小
fetch.message.max.bytes 1024*1024 从每个分区获取的消息大小限制
auto.commit.enable true 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset
auto.commit.interval.ms 60*1000 自动提交的时间间隔
queued.max.message.chunks 10 用来处理消费消息的块,每个块可以等同于fetch.message.max.bytes中数值
rebalance.max.retries 4 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册"Partition Owner registry"节点信息,但是有可能此时旧consumer尚没有释放此节点,此值用于控制,注册节点的重试次数.
rebalance.backoff.ms 2000 每次再平衡的时间间隔
refresh.leader.backoff.ms 每次重新选举leader的时间
fetch.min.bytes 1 server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求
fetch.wait.max.ms 100 若是不满足最小大小(fetch.min.bytes)的话,等待消费端请求的最长等待时间
consumer.timeout.ms -1 指定时间内没有消息到达就抛出异常,一般不需要改

4、server.properties模板

# broker id,多个broker服务器的话,每个broker id必须不同
broker.id=1

# kafka broker所在节点的
hostnamehostname=10.1.1.1.3:9092

#  处理网络请求的线程数
num.network.threads= 8

# 执行磁盘IO的线程数
num.io.threads=8

# server使用的send buffer大小。
socket.send.buffer.bytes=1048576

# server使用的recive buffer大小。
socket.receive.buffer.bytes=1048576

# 接受的最大请求大小(防止OOM)
socket.request.max.bytes=104857600

#-------------added by Kaim ---------------
# 加入队列的最大请求数(超过该值,network thread阻塞)
queued.max.requests=16

# purgatory(炼狱)是个容器,用来存放不能马上答复的网络请求。如果能答复请求则从炼狱删除。这个是fetch炼狱保存的最大请求数。设置的比默认值小是据说因为这里有个BUG,不知道0.10.x中解决没

# BUG说明见:http://blog.csdn.net/stark_summer/article/details/50203133fetch.purgatory.purge.interval.requests=100

# 生产者炼狱保存的最大请求数
producer.purgatory.purge.interval.requests=100

############################# 日志配置#############################
# 可以设置多个日志存放的路径
log.dirs=~/kafka-logs

# 默认每个主题的分区数,生产环境建议值:
8num.partitions= 8

# 启停时做日志恢复每个目录所需的线程数,采用RAID的时候可以增大该值
num.recovery.threads.per.data.dir=1

# 写入磁盘的消息批大小
log.flush.interval.messages=10000

# 强制刷新消息到磁盘的时间阈值
log.flush.interval.ms=10000

# 日志保留的最少时间 由于做压测,防止占用磁盘太多,保留时间为1ms# 
log.retention.hours=168log.retention.minutes=5

# 每个日志段大小,超过该值会生成新日志段
log.segment.bytes=1073741824

# 检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要求。
log.retention.check.interval.ms=300000

# --------------added by kami--------------
# 自动创建主题
auto.create.topics.enable=true

# 当执行一次fetch后,需要一定的空间扫描最近的offset,设置的越大越好,一般使用默认值就可以
log.index.interval.bytes=4096

# 每个log segment的最大尺寸。注意,如果log尺寸达到这个数值,即使尺寸没有超过log.segment.bytes限制,也需要产生新的log  segment。log.index.size.max.bytes=10485760

# 检查是否需要fsync的时间间隔
log.flush.scheduler.interval.ms=2000

# 即使文件没有到达log.segment.bytes,只要文件创建时间到达此属性,就会创建新文件。
log.roll.hours=168

# server可以接收的消息最大尺寸。重要的是,consumer和producer有关这个属性的设置必须同步,否则producer发布的消息对consumer来说太大。默认值均为一百万
message.max.bytes=1000000

############################# Zookeeper #############################
# zookeeper server地址,如果有多个则用逗号分隔
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181

# Timeout in ms for connecting to zookeeper

# zk连接的超时时间
zookeeper.connection.timeout.ms=6000

# zk follower的同步延迟时间
zookeeper.sync.time.ms = 2000

############################ replication configuration added by KamiWan##############

# 从leader备份数据的线程数
num.replica.fetchers=4

# 备份时每次fetch的最大值
replica.fetch.max.bytes=1048576

# follwer执行fetcher请求时的最大等待时间
replica.fetch.wait.max.ms=500

# 默认的replication数量,可以根据所需要的可靠性要求来配置
default.replication.factor=2

原文地址:https://blog.csdn.net/cold___play/article/details/132119955

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署