分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

1. kafka 生产者发送消息整体架构

生产者发送消息流程参考图1:

img

先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。

接下来,如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基于ProducerRecord对象的键选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条消息了。紧接着,该消息会被添加到一个消息批次里,这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标broker。

broker在收到这些消息时会返回一个响应。如果消息写入成功,就返回一个RecordMetaData对象,其中包含了主题和分区信息,以及消息在分区中的偏移量。如果消息写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,重试几次之后如果还是失败,则会放弃重试,并返回错误信息。

生产者发送消息流程参考图2:

在这里插入图片描述

① 主线程和Sender 线程:

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

② 消息累加器 RecordAccumulator :

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

③ 双端队列 Deque<ProducerBatch>:

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。

④ 消息批次 ProducerBatch:

注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑。与此同时,将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

⑤ BufferPool 和 ByteBuffer:

消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。

为什么使用 BufferPool 呢?

在Kafka中,RecordAccumulator是用来将生产者发送的消息缓存起来,以便批量发送到Kafka集群。在RecordAccumulator的内部,还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。

BufferPool是一个ByteBuffer的池子,它维护了一组ByteBuffer,这些ByteBuffer可以被多个线程共享。当一个线程需要一个ByteBuffer时,它可以从BufferPool中获取一个可用的ByteBuffer,使用完后再将它归还给BufferPool。这样可以避免频繁地创建和销毁ByteBuffer,提高了内存的利用率和性能。

⑥ ProducerBatch的大小和batch.size参数的关系:

ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

batch.size参数是Producer的一个配置参数,用于控制ProducerBatch的大小。它指定了一批消息的最大大小,单位是字节。当Producer发送的消息数量达到batch.size或者等待时间超过linger.ms时,Producer会将这一批消息打包成一个ProducerBatch并发送到Kafka集群中的Broker。

因此,batch.size参数的大小会直接影响ProducerBatch的大小。如果batch.size设置得太小,会导致ProducerBatch的大小也很小,这样会增加网络传输的开销,降低消息发送的效率。如果batch.size设置得太大,会导致ProducerBatch的大小过大,可能会占用过多的内存,甚至会导致消息发送超时或失败。

⑦ 创建 ProduceRequest:

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List<ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。在转换成<Node,List<ProducerBatch>>的形式之后,Sender 还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest。

⑧ 正在处理的请求 InFlightRequests:

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中。InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque<Request>的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

2. Kafka 生产者重要参数配置

01. acks

acks 指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。在默认情况下,Kafka会在leader副本收到消息后向客户端回应消息写入成功。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值(都是字符串类型)。

① acks=0: 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量(每秒钟传输的数据量)。

② acks=1:默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader副本,比如在leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。

③ acks=all:生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为all(或者-1)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况。

分区中的所有副本统称为AR 。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR,ISR集合是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。与leader副本同步滞后过多的副本(不包括leader副本)组成OSR,由此可见,AR=ISR+OSR。在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即AR=ISR,OSR集合为空。

注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常

// 设置 acks=1
properties.put(ProducerConfig.ACKS_CONFIG,"1");

你会发现,为acks设置的值越小,生产者发送消息的速度就越快。也就是说,我们通过牺牲可靠性来换取较低的生产者延迟。过,端到端延迟是指从消息生成到可供消费者读取的时间,这对3种配置来说都是一样的。这是因为为了保持一致性,在消息被写入所有同步副本之前,Kafka不允许消费者读取它们。因此,如果你关心的是端到端延迟,而不是生产者延迟,那么就不需要在可靠性和低延迟之间做权衡了:你可以选择最可靠的配置,但仍然可以获得相同的端到端延迟。

02. 消息传递时间

有几个参数可用来控制开发人员最感兴趣的生产者行为:在调用send()方法后多长时间可以知道消息发送成功与否。这也是等待Kafka返回成功响应或放弃重试并承认发送失败的时间。

从Kafka 2.1开始,我们将ProduceRecord的发送时间分成如下两个时间间隔,它们是被分开处理的。

  • 异步调用send()所花费的时间。在此期间,调用send()的线程将被阻塞。
  • 从异步调用send()返回到触发回调(不管是成功还是失败)的时间,也就是从ProduceRecord被放到批次中直到Kafka成功响应、出现不可恢复异常或发送超时的时间。

如果同步调用send(),那么发送线程将持续阻塞,也就无法知道每个时间间隔是多长。

在这里插入图片描述

① max.block.ms

这个参数用于控制在调用send()或通过partitionsFor()显式地请求元数据时生产者可以发生阻塞的时间。当生产者的发送缓冲区被填满或元数据不可用时,这些方法就可能发生阻塞。当达到max.block.ms配置的时间时,就会抛出一个超时异常。

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

// 设置 max.block.ms 为 60s
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,60);

② delivery.timeout.ms

这个参数用于控制从消息准备好发送(send()方法成功返回并将消息放入批次中)到broker响应或客户端放弃发送(包括重试)所花费的时间。这个时间应该大于linger.ms和request.timeout.ms。如果配置的时间不满足这一点,则会抛出异常。通常,成功发送消息的速度要比delivery.timeout.ms快得多。

如果生产者在重试时超出了delivery.timeout.ms,那么将执行回调,并会将broker之前返回的错误传给它。如果消息批次还没有发送完毕就超出了delivery.timeout.ms,那么也将执行回调,并会将超时异常传给它。

可以将这个参数配置成你愿意等待的最长时间,通常是几分钟,并使用默认的重试次数(几乎无限制)。基于这样的配置,只要生产者还有时间(或者在发送成功之前),它都会持续重试。这是一种合理的重试方式。我们的重试策略通常是:“在broker发生崩溃的情况下,首领选举通常需要30秒才能完成,因此为了以防万一,我们会持续重试120秒。”为了避免烦琐地配置重试次数和重试时间间隔,只需将delivery.timeout.ms设置为120。

// 设置 delivery.timeout.ms 为 120s
properties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,120);

③ request.timeout.ms

这个参数用于控制生产者在发送消息时等待服务器响应的时间。需要注意的是,这是指生产者在放弃之前等待每个请求的时间,不包括重试、发送之前所花费的时间等。如果设置的值已触及,但服务器没有响应,那么生产者将重试发送,或者执行回调,并传给它一个TimeoutException。

④ retries 和 retry.backoff.ms

当生产者收到来自服务器的错误消息时,这个错误有可能是暂时的(例如,一个分区没有首领)。在这种情况下,retries参数可用于控制生产者在放弃发送并向客户端宣告失败之前可以重试多少次。在默认情况下,重试时间间隔是100毫秒,但可以通过retry.backoff.ms参数来控制重试时间间隔。

⑤ 总结:

并不建议在当前版本的Kafka中使用这些参数。相反,你可以测试一下broker在发生崩溃之后需要多长时间恢复(也就是直到所有分区都有了首领副本),并设置合理的delivery.timeout.ms,让重试时间大于Kafka集群从崩溃中恢复的时间,以免生产者过早放弃重试。

生产者并不会重试所有的错误。有些错误不是暂时的,生产者就不会进行重试(例如,“消息太大”错误)。通常,对于可重试的错误,生产者会自动进行重试,所以不需要在应用程序中处理重试逻辑。你要做的是集中精力处理不可重试的错误或者当重试次数达到上限时的情况。

03. linger.ms

这个参数指定了生产者在发送消息批次之前等待更多消息加入批次的时间。生产者会在批次被填满或等待时间达到linger.ms时把消息批次发送出去。在默认情况下,只要有可用的发送者线程,生产者都会直接把批次发送出去,就算批次中只有一条消息。

把linger.ms设置成比0大的数,可以让生产者在将批次发送给服务器之前等待一会儿,以使更多的消息加入批次中。虽然这样会增加一点儿延迟,但也极大地提升了吞吐量。这是因为一次性发送的消息越多,每条消息的开销就越小,如果启用了压缩,则计算量也更少了。

04. buffer.memory

这个参数用来设置生产者要发送给服务器的消息的内存缓冲区大小。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。需要注意的是,这个异常与其他异常不一样,它是send()方法而不是Future对象抛出来的。

05. batch.size

当有多条消息被发送给同一个分区时,生产者会把它们放在同一个批次里。这个参数指定了一个批次可以使用的内存大小。需要注意的是,该参数是按照字节数而不是消息条数来计算的。当批次被填满时,批次里所有的消息都将被发送出去。但是生产者并不一定都会等到批次被填满时才将其发送出去。那些未填满的批次,甚至只包含一条消息的批次也有可能被发送出去。所以,就算把批次大小设置得很大,也不会导致延迟,只是会占用更多的内存而已。但如果把批次大小设置得太小,则会增加一些额外的开销,因为生产者需要更频繁地发送消息。

06. max.in.flight.requests.per.connection

这个参数指定了生产者在收到服务器响应之前可以发送多少个消息批次。它的值越大,占用的内存就越多,不过吞吐量也会得到提升。Apache wiki页面上的实验数据表明,在单数据中心环境中,该参数被设置为2时可以获得最佳的吞吐量,但使用默认值5也可以获得差不多的性能。

07. compression.type

在默认情况下,生产者发送的消息是未经压缩的。这个参数可以被设置为snappy、gzip、lz4或zstd,这指定了消息被发送给broker之前使用哪一种压缩算法。snappy压缩算法由谷歌发明,虽然占用较少的CPU时间,但能提供较好的性能和相当可观的压缩比。如果同时有性能和网络带宽方面的考虑,那么可以使用这种算法。gzip压缩算法通常会占用较多的CPU时间,但提供了更高的压缩比。如果网络带宽比较有限,则可以使用这种算法。使用压缩可以降低网络传输和存储开销,而这些往往是向Kafka发送消息的瓶颈所在。

08. max.request.size

这个参数用于控制生产者发送的请求的大小。它限制了可发送的单条最大消息的大小和单个请求的消息总量的大小。假设这个参数的值为1 MB,那么可发送的单条最大消息就是1 MB,或者生产者最多可以在单个请求里发送一条包含1024个大小为1 KB的消息。另外,broker对可接收的最大消息也有限制(message.max.bytes),其两边的配置最好是匹配的,以免生产者发送的消息被broker拒绝。

09. receive.buffer.bytes和 send.buffer.bytes

这两个参数分别指定了TCP socket接收和发送数据包的缓冲区大小。如果它们被设为–1,就使用操作系统默认值。如果生产者或消费者与broker位于不同的数据中心,则可以适当加大它们的值,因为跨数据中心网络的延迟一般都比较高,而带宽又比较低。

10. enable.idempotence

从0.11版本开始,Kafka支持精确一次性语义。

假设为了最大限度地提升可靠性,你将生产者的acks设置为all,并将delivery.timeout.ms设置为一个比较大的数,允许进行尽可能多的重试。这些配置可以确保每条消息被写入Kafka至少一次。但在某些情况下,消息有可能被写入Kafka不止一次。假设一个broker收到了生产者发送的消息,然后消息被写入本地磁盘并成功复制给了其他broker。此时,这个broker还没有向生产者发送响应就发生了崩溃。而生产者将一直等待,直到达到request.timeout.ms,然后进行重试。重试发送的消息将被发送给新的首领,而这个首领已经有这条消息的副本,因为之前写入的消息已经被成功复制给它了。现在,你就有了一条重复的消息。

为了避免这种情况,可以将enable.idempotence设置为true。当幂等生产者被启用时,生产者将给发送的每一条消息都加上一个序列号。如果broker收到具有相同序列号的消息,那么它就会拒绝第二个副本,而生产者则会收到DuplicateSequenceException,这个异常对生产者来说是无害的。

如果要启用幂等性,那么max.in.flight.requests.per.connection应小于或等于5、retries应大于0,并且acks被设置为all。如果设置了不恰当的值,则会抛出ConfigException异常。

3. kafka 生产者中何时发生QueueFullException?

kafka 生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常 QueueFullException。

Kafka生产者在发送消息时,如果发送的消息数量超过了生产者内部缓冲区的大小,就会发生QueueFullException异常。这个缓冲区的大小由生产者配置参数buffer.memory来控制,默认值为32MB。当生产者发送的消息数量超过了这个缓冲区的大小时,就会抛出QueueFullException异常。

此外,如果生产者发送消息的速度超过了Kafka集群的处理速度,也会导致生产者内部缓冲区满,从而抛出QueueFullException异常。在这种情况下,可以通过调整生产者的发送速度或增加Kafka集群的处理能力来解决问题。

4. Kafka Producer是否直接将数据发送到broker的leader节点?

Kafka Producer会直接将数据发送到broker的leader节点。在Kafka中,每个分区都有一个leader节点和多个follower节点。Producer会将消息发送到分区的leader节点,然后由leader节点负责将消息复制到所有的follower节点。这种设计可以提高Kafka的可靠性和可扩展性,因为如果某个节点出现故障,Kafka可以自动将leader节点切换到其他可用的节点上,从而保证数据的可靠性和高可用性。

5. Kafka 如何实现批量发送?

Kafka 实现批量发送的方式是通过生产者的 batch.size 参数来控制的。该参数指定了生产者在发送消息之前等待的最大字节数。当生产者收集到的消息大小达到 batch.size 时,它会将这些消息一起发送到 Kafka 集群。

具体来说,当生产者收到一条消息时,它会将该消息添加到一个缓冲区中。如果缓冲区中的消息大小达到了 batch.size,或者等待时间超过了 linger.ms(指定了生产者在发送消息之前等待的最大时间),生产者就会将缓冲区中的所有消息一起发送到 Kafka 集群。

通过调整 batch.size 和 linger.ms 参数,可以控制生产者发送消息的频率和批量大小,从而优化生产者的性能。

6. Kafka 生产者最佳实践?

01. 发送消息

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
        topic,   //消息主题。
        null,   //分区编号。建议为null,由Producer分配。
        System.currentTimeMillis(),   //时间戳。
        String.valueOf(value.hashCode()),   //消息键。
        value   //消息值。
));

02. Key和Value

Kafka 的消息有以下两个字段:

  • Key:消息的标识。
  • Value:消息内容。

为了便于追踪,建议为消息设置一个唯一的Key。可以通过Key追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。如果消息发送量较大,建议不要设置Key,并使用黏性分区策略。

03. 失败重试

分布式环境下,由于网络等原因偶尔发送失败是常见的。导致这种失败的原因可能是消息已经发送成功,但是ACK失败,也有可能是确实没发送成功。

Kafka 是VIP网络架构,长时间不进行通信连接会被主动断开,因此,不是一直活跃的客户端会经常收到connection reset by peer错误,建议重试消息发送。可以根据业务需求,设置以下重试参数:

  • retries:消息发送失败时的重试次数。
  • retry.backoff.ms,消息发送失败的重试间隔,建议设置为1000,单位:毫秒。

04. 使用异步发送方式

05. Acks

Acks的说明如下:

  • acks=0:无需服务端的Response、性能较高、丢数据风险较大。
  • acks=1:服务端主节点写成功即返回Response、性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。
  • acks=all:服务端主节点写成功且备节点同步成功才返回Response、性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。

为了提升发送性能, 建议设置为acks=1

06. 提升发送性能(减少碎片化发送请求)

一般情况下,一个Kafka Topic 会有多个分区。Kafka Producer客户端在向服务端发送消息时,需要先确认往哪个Topic的哪个分区发送。我们给同一个分区发送多条消息时,Producer客户端将相关消息打包成一个Batch,批量发送到服务端。Producer客户端在处理Batch时,是有额外开销的。一般情况下,小Batch会导致Producer客户端产生大量请求,造成请求队列在客户端和服务端的排队,并造成相关机器的CPU升高,从而整体推高了消息发送和消费延迟。一个合适的Batch大小,可以减少发送消息时客户端向服务端发起的请求次数,在整体上提高消息发送的吞吐和延迟。

Batch机制,Kafka Producer端主要通过两个参数进行控制:

  • batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)。达到设置的数值时,就会触发一次网络请求,然后Producer客户端把消息批量发往服务器。如果batch.size设置过小,有可能影响发送性能和稳定性。建议保持默认值16384。单位:字节。
  • linger.ms : 每条消息在缓存中的最长时间。若超过这个时间,Producer客户端就会忽略batch.size的限制,立即把消息发往服务器。建议根据业务场景, 设置linger.ms在100~1000之间。单位:毫秒。

因此,Kafka Producer客户端什么时候把消息批量发送至服务器是由batch.sizelinger.ms共同决定的。您可以根据具体业务需求进行调整。为了提升发送的性能,保障服务的稳定性, 建议您设置batch.size=16384linger.ms=1000

07. 黏性分区策略

只有发送到相同分区的消息,才会被放到同一个Batch中,因此决定一个Batch如何形成的一个因素是Kafka Producer端设置的分区策略。Kafka Producer允许通过设置Partitioner的实现类来选择适合自己业务的分区。在消息指定Key的情况下,云消息队列 Kafka 版Producer的默认策略是对消息的Key进行哈希,然后根据哈希结果选择分区,保证相同Key的消息会发送到同一个分区。

在消息没有指定Key的情况下,Kafka 2.4版本之前的默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。但是,这种默认策略Batch的效果会比较差,在实际使用中,可能会产生大量的小Batch,从而使得实际的延迟增加。鉴于该默认策略对无Key消息的分区效率低问题,Kafka 在2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。

黏性分区策略主要解决无Key消息分散到不同分区,造成小Batch问题。其主要策略是如果一个分区的Batch完成后,就随机选择另一个分区,然后后续的消息尽可能地使用该分区。这种策略在短时间内看,会将消息发送到同一个分区,如果拉长整个运行时间,消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜,同时还可以降低延迟,提升服务整体性能。

如果您使用的Kafka Producer客户端是2.4及以上版本,默认的分区策略就采用黏性分区策略。如果您使用的Producer客户端版本小于2.4,可以根据黏性分区策略原理,自行实现分区策略,然后通过参数partitioner.class设置指定的分区策略。

原文地址:https://blog.csdn.net/qq_42764468/article/details/132220317

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署