Kafka从入门到进阶

1.  Apache Kafka是一个分布式流平台

1.1  流平台有三个关键功能:

  1. 发布和订阅流记录,类似于一个消息队列或企业消息系统
  2. 以一种容错的持久方式存储记录流
  3. 在流记录生成的时候就处理它们

1.2  Kafka通常用于两大类应用:

  1. 构建实时流数据管道,在系统或应用程序之间可靠地获取数据
  2. 构建对数据流进行转换或输出的实时流媒体应用程序

1.3  有几个特别重要的概念:

Kafka is run as a cluster on one or more servers that can span multiple datacenters.

The Kafka cluster stores streams of records in categories called topics.

Each record consists of a key,a value,and a timestamp.

  Kafka作为集群运行在一个或多个可以跨多个数据中心的服务器上

  从这句话表达了三个意思:

  1. Kafka是以集群方式运行的
  2. 集群中可以只有一台服务器,也有可能有多台服务器。也就是说,一台服务器也是一个集群,多台服务器也可以组成一个集群
  3. 这些服务器可以跨多个数据中心

  Kafka集群按分类存储流记录,这个分类叫做主题

  这句话表达了以下几个信息:

  1. 流记录是分类存储的,也就说记录是归类的
  2. 我们称这种分类为主题
  3. 简单地来讲,记录是按主题划分归类存储的

  每个记录由一个键、一个值和一个时间戳组成

1.4  Kafka有四个核心API:

  • Producer API :允许应用发布一条流记录到一个或多个主题
  • Consumer API :允许应用订阅一个或多个主题,并处理流记录
  • Streams API :允许应用作为一个流处理器,从一个或多个主题那里消费输入流,并将输出流输出到一个或多个输出主题,从而有效地讲输入流转换为输出流
  • Connector API :允许将主题连接到已经存在的应用或者数据系统,以构建并允许可重用的生产者或消费者。例如,一个关系型数据库的连接器可能捕获到一张表的每一次变更

(画外音:我理解这四个核心API其实就是:发布、订阅、转换处理、从第三方采集数据。)

在Kafka中,客户端和服务器之间的通信是使用简单的、高性能的、与语言无关的TCP协议完成的。

2.  Topics and Logs(主题和日志)

一个topic是一个分类,或者说是记录被发布的时候的一个名字(画外音:可以理解为记录要被发到哪儿去)。

在Kafka中,topic总是有多个订阅者,因此,一个topic可能有0个,1个或多个订阅该数据的消费者。

对于每个主题,Kafka集群维护一个分区日志,如下图所示:

每个分区都是一个有序的、不可变的记录序列,而且记录会不断的被追加,一条记录就是一个结构化的提交日志(a structured commit log)。

分区中的每条记录都被分配了一个连续的id号,这个id号被叫做offset(偏移量),这个偏移量唯一的标识出分区中的每条记录。(PS:如果把分区比作数据库表的话,那么偏移量就是主键)

Kafka集群持久化所有已发布的记录,无论它们有没有被消费,记录被保留的时间是可以配置的。例如,如果保留策略被设置为两天,那么在记录发布后的两天内,可以使用它,之后将其丢弃以释放空间。在对数据大小方面,Kafka的性能是高效的,恒定常量级的,因此长时间存储数据不是问题。

事实上,唯一维护在每个消费者上的元数据是消费者在日志中的位置或者叫偏移量。偏移量是由消费者控制的:通常消费者在读取记录的时候会线性的增加它的偏移量,但是,事实上,由于位置(偏移量)是由消费者控制的,所有它可以按任意它喜欢的顺序消费记录。例如:一个消费者可以重置到一个较旧的偏移量来重新处理之前已经处理过的数据,或者跳转到最近的记录并从“现在”开始消费。

这种特性意味着消费者非常廉价————他们可以来来去去的消息而不会对集群或者其它消费者造成太大影响。

日志中的分区有几个用途。首先,它们允许日志的规模超出单个服务器的大小。每个独立分区都必须与宿主的服务器相匹配,但一个主题可能有多个分区,所以它可以处理任意数量的数据。第二,它们作为并行的单位——稍后再进一步。

画外音:简单地来说,日志分区的作用有两个:一、日志的规模不再受限于单个服务器;二、分区意味着可以并行。

什么意思呢?主题建立在集群之上,每个主题维护了一个分区日志,顾名思义,日志是分区的;每个分区所在的服务器的资源(比如:CPU、内存、带宽、磁盘等)是有限的,如果不分区(可以理解为等同于只有一个)的话,必然受限于这个分区所在的服务器,那么多个分区的话就不一样了,就突破了这种限制,服务器可以随便加,分区也可以随便加。

3.  Distribution(分布)

日志的分区分布在集群中的服务器上,每个服务器处理数据,并且分区请求是共享的。每个分区被复制到多个服务器上以实现容错,到底复制到多少个服务器上是可以配置的。

Each partition is replicated across a configurable number of servers for fault tolerance.

每个分区都有一个服务器充当“leader”角色,并且有0个或者多个服务器作为“followers”。leader处理对这个分区的所有读和写请求,而followers被动的从leader那里复制数据。如果leader失败,followers中的其中一个会自动变成新的leader。每个服务器充当一些分区的“leader”的同时也是其它分区的“follower”,因此在整个集群中负载是均衡的。

也就是说,每个服务器既是“leader”也是“follower”。我们知道一个主题可能有多个分区,一个分区可能在一个服务器上也可能跨多个服务器,然而这并不以为着一台服务器上只有一个分区,是可能有多个分区的。每个分区中有一个服务器充当“leader”,其余是“follower”。leader负责处理这个它作为leader所负责的分区的所有读写请求,而该分区中的follow只是被动复制leader的数据。这个有点儿像HDFS中的副本机制。例如:分区-1有服务器A和B组成,A是leader,B是follower,有请求要往分区-1中写数据的时候就由A处理,然后A把刚才写的数据同步给B,这样的话正常请求相当于A和B的数据是一样的,都有分区-1的全部数据,如果A宕机了,B成为leader,接替A继续处理对分区-1的读写请求。

需要注意的是,分区是一个虚拟的概念,是一个逻辑单元。

4.  Producers(生产者)

生产者发布数据到它们选择的主题中。生产者负责选择将记录投递到哪个主题的哪个分区中。要做这件事情,可以简单地用循环方式以到达负载均衡,或者根据一些语义分区函数(比如:基于记录中的某些key)

5.  Consumers(消费者)

消费者用一个消费者组名来标识它们自己(PS:相当于给自己贴一个标签,标签的名字是组名,以表明自己属于哪个组),并且每一条发布到主题中的记录只会投递给每个订阅的消费者组中的其中一个消费者实例。消费者实例可能是单独的进程或者在单独的机器上。

如果所有的消费者实例都使用相同的消费者组,那么记录将会在这些消费者之间有效的负载均衡。

如果所有的消费者实例都使用不同的消费者组,那么每条记录将会广播给所有的消费者进程。

上图中其实那个Kafka Cluster换成Topic会更准确一些

一个Kafka集群有2个服务器,4个分区(P0-P3),有两个消费者组。组A中有2个消费者实例,组B中有4个消费者实例。
通常我们会发现,主题不会有太多的消费者组,每个消费者组是一个“逻辑订阅者”(以消费者组的名义订阅主题,而非以消费者实例的名义去订阅)。每个组由许多消费者实例组成,以实现可扩展性和容错。这仍然是发布/订阅,只不过订阅者是一个消费者群体,而非单个进程。

在Kafka中,这种消费方式是通过用日志中的分区除以使用者实例来实现的,这样可以保证在任意时刻每个消费者都是排它的消费,即“公平共享”。Kafka协议动态的处理维护组中的成员。如果有心的实例加入到组中,它们将从组中的其它成员那里接管一些分区;如果组中有一个实例死了,那么它的分区将会被分给其它实例。

(画外音:什么意思呢?举个例子,在上面的图中,4个分区,组A有2个消费者,组B有4个消费者,那么对A来讲组中的每个消费者负责4/2=2个分区,对组B来说组中的每个消费者负责4/4=1个分区,而且同一时间消息只能被组中的一个实例消费。如果组中的成员数量有变化,则重新分配。)

Kafka只提供分区下的记录的总的顺序,而不提供主题下不同分区的总的顺序。每个分区结合按key划分数据的能力排序对大多数应用来说是足够的。然而,如果你需要主题下总的记录顺序,你可以只使用一个分区,这样做的做的话就意味着每个消费者组中只能有一个消费者实例。

6.  保证

在一个高级别的Kafka给出下列保证:

  1. 被一个生产者发送到指定主题分区的消息将会按照它们被发送的顺序追加到分区中。也就是说,如果记录M1和M2是被同一个生产者发送到同一个分区的,而且M1是先发送的,M2是后发送的,那么在分区中M1的偏移量一定比M2小,并且M1出现在日志中的位置更靠前。
  2. 一个消费者看到记录的顺序和它们在日志中存储的顺序是一样的。
  3. 对于一个副本因子是N的主题,我们可以容忍最多N-1个服务器失败,而不会丢失已经提交给日志的任何记录。

7.  Spring Kafka

Spring提供了一个“模板”作为发送消息的高级抽象。它也通过使用@KafkaListener注释和“监听器容器”提供对消息驱动POJOs的支持。这些库促进了依赖注入和声明式的使用。

7.1  纯Java方式

 1 package com.cjs.example.quickstart;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerConfig;
 4  org.apache.kafka.clients.consumer.ConsumerRecord;
 5  org.apache.kafka.clients.producer.ProducerConfig;
 6  org.apache.kafka.common.serialization.IntegerDeserializer;
 7  org.apache.kafka.common.serialization.IntegerSerializer;
 8  org.apache.kafka.common.serialization.StringDeserializer;
 9  org.apache.kafka.common.serialization.StringSerializer;
10 import org.springframework.kafka.core.*;
11  org.springframework.kafka.listener.KafkaMessageListenerContainer;
12  org.springframework.kafka.listener.MessageListener;
13  org.springframework.kafka.listener.config.ContainerProperties;
14 
15  java.util.HashMap;
16  java.util.Map;
17 
18 public class PureJavaDemo {
19 
20     /**
21      * 生产者配置
22      */
23     private static Map<String,Object> senderProps() {
24         Map<String,Object> props = new HashMap<>();
25         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.101.5:9093");
26         props.put(ProducerConfig.RETRIES_CONFIG,027         props.put(ProducerConfig.BATCH_SIZE_CONFIG,1638428         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.29         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.30         return props;
31     }
32 
33     34      * 消费者配置
35      36      consumerProps() {
37         Map<String,1)">38         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,1)">39         props.put(ConsumerConfig.GROUP_ID_CONFIG,"hello"40         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false41         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100"42         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000"43         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,IntegerDeserializer.44         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.45         46 47 
48     49      * 发送模板配置
50      51     static KafkaTemplate<Integer,String> createTemplate() {
52         Map<String,Object> senderProps = senderProps();
53         ProducerFactory<Integer,String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
54         KafkaTemplate<Integer,String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
55          kafkaTemplate;
56 57 
58     59      * 消息监听器容器配置
60      61     static KafkaMessageListenerContainer<Integer,1)"> createContainer() {
62         Map<String,Object> consumerProps = consumerProps();
63         ConsumerFactory<Integer,String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
64         ContainerProperties containerProperties = new ContainerProperties("test"65         KafkaMessageListenerContainer<Integer,String> container = new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);
66          container;
67 68 
69 
70     static void main(String[] args) throws InterruptedException {
71         String topic1 = "test"; //  主题
72 
73         KafkaMessageListenerContainer container = createContainer();
74         ContainerProperties containerProperties = container.getContainerProperties();
75         containerProperties.setMessageListener(new MessageListener<Integer,1)">() {
76             @Override
77             void onMessage(ConsumerRecord<Integer,1)"> record) {
78                 System.out.println("Received: " + record);
79             }
80         });
81         container.setBeanName("testAuto"82 
83         container.start();
84 
85         KafkaTemplate<Integer,String> kafkaTemplate = createTemplate();
86         kafkaTemplate.setDefaultTopic(topic1);
87 
88         kafkaTemplate.sendDefault(0,"foo"89         kafkaTemplate.sendDefault(2,"bar"90         kafkaTemplate.sendDefault(0,"baz"91         kafkaTemplate.sendDefault(2,"qux"92 
93         kafkaTemplate.flush();
94         container.stop();
95 
96         System.out.println("结束"97 98 
99 }

运行结果:

Received: ConsumerRecord(topic = test,partition = 0,offset = 67,CreateTime = 1533300970788,serialized key size = 4,serialized value size = 3,headers = RecordHeaders(headers = [],isReadOnly = false),key = 0,value = foo)
Received: ConsumerRecord(topic = test,offset = 68,CreateTime = 1533300970793,key = 2,value = bar)
Received: ConsumerRecord(topic = test,offset = 69,value = baz)
Received: ConsumerRecord(topic = test,offset = 70,value = qux)

7.2  更简单一点儿,用SpringBoot

 org.springframework.beans.factory.annotation.Autowired;
 org.springframework.boot.CommandLineRunner;
 org.springframework.context.annotation.Bean;
 org.springframework.context.annotation.Configuration;
 org.springframework.kafka.annotation.KafkaListener;
 org.springframework.kafka.core.KafkaTemplate;
10 
@Configuration
 JavaConfigurationDemo {
13 
14     @KafkaListener(topics = "test")
15     void listen(ConsumerRecord<String,1)">16         System.out.println("收到消息: " +17 18 
19     @Bean
public CommandLineRunner commandLineRunner() {
21         return new MyRunner();
22 23 
24     class MyRunner implements CommandLineRunner {
25 
26         @Autowired
27         private KafkaTemplate<String,1)">28 
29         @Override
void run(String... args)  Exception {
31             kafkaTemplate.send("test","foo1"32             kafkaTemplate.send("test","foo2"33             kafkaTemplate.send("test","foo3"34             kafkaTemplate.send("test","foo4"35         }
36 37 }

application.properties配置

spring.kafka.bootstrap-servers=192.168.101.5:9092
spring.kafka.consumer.group-id=world

8.  生产者

 com.cjs.example.send;
 org.springframework.kafka.core.DefaultKafkaProducerFactory;
 org.springframework.kafka.core.ProducerFactory;
11 
 Config {
18     public Map<String,1)"> producerConfigs() {
19         Map<String,1)">20         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.101.5:9092"21         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,1)">22         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,1)">23         24 26     public ProducerFactory<Integer,1)"> producerFactory() {
(producerConfigs());
28 29 
30 31     public KafkaTemplate<Integer,1)"> kafkaTemplate() {
32         new KafkaTemplate<Integer,1)">(producerFactory());
33 34 
35 }
 org.springframework.kafka.support.SendResult;
 org.springframework.stereotype.Component;
 org.springframework.util.concurrent.ListenableFuture;
 org.springframework.util.concurrent.ListenableFutureCallback;
@Component
class MyCommandLineRunner 14     @Autowired
private KafkaTemplate<Integer,1)">16 
17     void sendTo(Integer key,String value) {
18         ListenableFuture<SendResult<Integer,String>> listenableFuture = kafkaTemplate.send("test",key,value);
19         listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer,String>>20 21              onFailure(Throwable throwable) {
22                 System.out.println("发送失败啦"23                 throwable.printStackTrace();
27             void onSuccess(SendResult<Integer,1)"> sendResult) {
28                 System.out.println("发送成功," + sendResult);
    @Override
34     35         sendTo(1,"aaa"36         sendTo(2,"bbb"37         sendTo(3,"ccc"38 39 
40 
41 }

运行结果:

发送成功,SendResult [producerRecord=ProducerRecord(topic=test,partition=null,headers=RecordHeaders(headers = [],isReadOnly = true),key=1,value=aaa,timestamp=null),recordMetadata=test-0@37]
发送成功,SendResult [producerRecord=ProducerRecord(topic=test,key=2,value=bbb,recordMetadata=test-0@38]
发送成功,SendResult [producerRecord=ProducerRecord(topic=test,key=3,value=ccc,recordMetadata=test-0@39]

9.  消费者@KafkaListener

 com.cjs.example.receive;
 org.springframework.kafka.annotation.TopicPartition;
 org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 org.springframework.kafka.config.KafkaListenerContainerFactory;
 org.springframework.kafka.core.ConsumerFactory;
 org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 org.springframework.kafka.listener.AbstractMessageListenerContainer;
 org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 org.springframework.kafka.support.Acknowledgment;
 org.springframework.kafka.support.KafkaHeaders;
 org.springframework.messaging.handler.annotation.Header;
 org.springframework.messaging.handler.annotation.Payload;
22 
 java.util.List;
25 26 
27  Config2 {
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,1)"> kafkaListenerContainerFactory() {
32         ConcurrentKafkaListenerContainerFactory<Integer,String> factory = new ConcurrentKafkaListenerContainerFactory<>        factory.setConsumerFactory(consumerFactory());
34         factory.setConcurrency(335         ContainerProperties containerProperties = factory.getContainerProperties();
36         containerProperties.setPollTimeout(200037         containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
38          factory;
39 41     private ConsumerFactory<Integer,1)"> consumerFactory() {
42         (consumerProps());
43 44 
45     private Map<String,1)">46         Map<String,1)">47         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,1)">48         props.put(ConsumerConfig.GROUP_ID_CONFIG,"hahaha"        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
50         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,1)">51         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,1)">52         53 54 
55 
56     @KafkaListener(topics = "test"57      listen(String data) {
58         System.out.println("listen 收到: " + data);
60 
61 
62     @KafkaListener(topics = "test",containerFactory = "kafkaListenerContainerFactory"63      listen2(String data,Acknowledgment ack) {
64         System.out.println("listen2 收到: " +65         ack.acknowledge();
66 67 
68     @KafkaListener(topicPartitions = {@TopicPartition(topic = "test",partitions = "0")})
69     void listen3(ConsumerRecord<?,?>70         System.out.println("listen3 收到: " + record.value());
71 73 
74     @KafkaListener(id = "xyz",topics = "test"75      listen4(@Payload String foo,                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,1)">77                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,1)">78                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,1)">79                         @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
80         System.out.println("listen4 收到: "81         System.out.println(foo);
82         System.out.println(key);
        System.out.println(partition);
84         System.out.println(topic);
85         System.out.println(offsets);
88 }

9.1  Committing Offsets

如果enable.auto.commit设置为true,那么kafka将自动提交offset。如果设置为false,则支持下列AckMode(确认模式)。

消费者poll()方法将返回一个或多个ConsumerRecords

  • RECORD :处理完记录以后,当监听器返回时,提交offset
  • BATCH  :当对poll()返回的所有记录进行处理完以后,提交偏offset
  • TIME   :当对poll()返回的所有记录进行处理完以后,只要距离上一次提交已经过了ackTime时间后就提交
  • COUNT  :当poll()返回的所有记录都被处理时,只要从上次提交以来收到了ackCount条记录,就可以提交
  • COUNT_TIME :和TIME以及COUNT类似,只要这两个中有一个为true,则提交
  • MANUAL :消息监听器负责调用Acknowledgment.acknowledge()方法,此后和BATCH是一样的
  • MANUAL_IMMEDIATE :当监听器调用Acknowledgment.acknowledge()方法后立即提交

10.  Spring Boot Kafka

10.1  application.properties

spring.kafka.bootstrap-servers=192.168.101.5:9092

10.2  发送消息

 com.cjs.example;
 org.springframework.web.bind.annotation.RequestMapping;
 org.springframework.web.bind.annotation.RestController;
 7 
 javax.annotation.Resource;
 9 
@RestController
11 @RequestMapping("/msg" MessageController {
    @Resource
17     @RequestMapping("/send" String send(String topic,String key,1)">        kafkaTemplate.send(topic,1)">20         return "ok"23 }

10.3  接收消息

 org.springframework.kafka.annotation.KafkaListeners;
 MessageListener {
11          * 监听订单消息
13      14     @KafkaListener(topics = "ORDER",groupId = "OrderGroup" listenToOrder(String data) {
16         System.out.println("收到订单消息:" +19          * 监听会员消息
21      22     @KafkaListener(topics = "MEMBER",groupId = "MemberGroup"void listenToMember(ConsumerRecord<String,1)">24         System.out.println("收到会员消息:" +27          * 监听所有消息
     *
     * 任意时刻,一条消息只会发给组中的一个消费者
32      * 消费者组中的成员数量不能超过分区数,这里分区数是1,因此订阅该主题的消费者组成员不能超过1
33          @KafkaListeners({@KafkaListener(topics = "ORDER",groupId = "OrderGroup"),1)">            @KafkaListener(topics = "MEMBER",groupId = "MemberGroup")})
    public void listenToAll(String data) {
        System.out.println("啊啊啊");
    }
40 }

11.  pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    modelVersion>4.0.0</>

    groupId>com.cjs.exampleartifactId>cjs-kafka-exampleversion>0.0.1-SNAPSHOTpackaging>jarnamedescription></parent>
        >org.springframework.boot>spring-boot-starter-parent>2.0.4.RELEASErelativePath/> <!-- lookup parent from repository -->
    propertiesproject.build.sourceEncoding>UTF-8project.reporting.outputEncodingjava.version>1.8dependenciesdependency>
            >spring-boot-starter-web>org.springframework.kafka>spring-kafka>

        >spring-boot-starter-testscope>testbuildpluginsplugin>
                >spring-boot-maven-plugin>


project>

12.  其它

有兴趣的话可以看一下其它几篇:

Kafka集群搭建

Kafka基本知识回顾及复制

Kakfa消息投递语义

Push VS Pull

Kafka Producer Consumer

Kafka快速开始

Kafka介绍

13.  参考

http://kafka.apache.org/intro

http://spring.io/projects/spring-kafka

https://docs.spring.io/spring-boot/docs/2.0.4.RELEASE/reference/htmlsingle/#boot-features-kafka

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署