Kerberos安全认证-连载12-Kafka Kerberos安全配置及访问

目录

1. Kafka配置Kerberos

2. 客户端操作Kafka

​​​​​​​3. Java API操作Kafka

4. StructuredStreaming操作Kafka

5. Flink 操作Kafka


技术连载系列,前面内容请参考前面连载11内容:​​​​​​​​​​​​​​Kerberos安全认证-连载11-HBase Kerberos安全配置及访问_IT贫道的博客-CSDN博客

1. Kafka配置Kerberos

Kafka也支持通过Kerberos进行认证,避免非法用户操作读取Kafka中的数据,对Kafka进行Kerberos认证可以按照如下步骤实现。

1) 创建Kafka服务Princial主体并写入到keytab文件

在kerberos服务端node1节点执行如下命令创建Kafka服务主体:

[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 kafka/node1"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 kafka/node2"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 kafka/node3"

在kerberos服务端node1节点执行如下命令将Kafka服务主体写入到keytab文件。

#node1节点执行命令,将主体写入到keytab
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/kafka.service.keytab kafka/node1@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/kafka.service.keytab kafka/node2@EXAMPLE.COM"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/kafka.service.keytab kafka/node3@EXAMPLE.COM"

以上命令执行完成后,在node1节点/home/keytabs目录下生成kafka.service.keytab文件,将该文件分发到各个节点并赋权,这里可以只发送到node1~node3 Kafka所在节点,为了保证各个大数据集群节点的keytabs一致,这里分发到所有节点。

[root@node1 ~]# scp /home/keytabs/kafka.service.keytab node2:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/kafka.service.keytab node3:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/kafka.service.keytab node4:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/kafka.service.keytab node5:/home/keytabs/

分发完成后,在集群各个节点上执行如下命令,修改kafka.service.keytab密钥文件访问权限:

chmod 770 /home/keytabs/kafka.service.keytab

​​​​​​​​​​​​​​2) 修改配置server.properties文件

在Kafka各个节点KAFKA_HOME/config/server.properties文件中加入如下配置以支持Kerberos安全认证。

#在node1~node3所有节点单独配置
listeners=SASL_PLAINTEXT://:9092
inter.broker.listener.name=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
zookeeper.set.acl=false
allow.everyone.if.no.acl.found=true

该配置需要在node1~node3所有节点配置,以上参数解释如下:

  • listeners=SASL_PLAINTEXT://:9092

指定kafka监听的协议和端口,SASL_PLAINTEXT表示使用SASL(Simple Authentication and Security Layer)机制进行认证和加密通信。

  • inter.broker.listener.name=SASL_PLAINTEXT

指定Kafka Broker 之间通信使用SASL_PLAINTEXT机制进行认证和加密通信。

  • sasl.mechanism.inter.broker.protocol=GSSAPI

指定broker之间通信使用SASL机制,GSSAPI是一种基于Kerberos的SASL机制,它使用GSS-API(Generic Security Services Application Programming Interface)进行认证。

  • sasl.enabled.mechanisms=GSSAPI

只启用了GSSAPI机制,表示Kafka只接受使用Kerberos进行认证的连接。

  • sasl.kerberos.service.name=kafka

指定Kafka在Kerberos中注册的服务名称,以便用于进行身份验证和授权。

  • authorizer.class.name=kafka.security.authorizer.AclAuthorizer

指定Kafka使用的授权器类。

  • zookeeper.set.acl=false

是否在ZooKeeper中设置ACL(Access Control List),这里设置为false,表示不对ZooKeeper节点设置ACL。

  • allow.everyone.if.no.acl.found=true

指定当没有匹配的ACL规则时,是否允许所有用户访问。

3) 准备kafka_jaas.conf文件

在node1~node3各个节点中准备kafka_jaas.conf文件,该文件配置kafka服务端和zookeeper客户端的身份验证和授权配置,由于zookeeper开启了Kerberos认证,所以这里需要进行zookeeper客户端的身份验证配置。

这里在各个kafka节点KAFKA_HOME/config/目录中创建kafka_jaas.conf文件,内容如下:

KafkaServer {
 com.sun.security.auth.module.Krb5LoginModule required
 useKeyTab=true
 storeKey=true
 keyTab="/home/keytabs/kafka.service.keytab"
 serviceName="kafka"
 principal="kafka/node1@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="zookeeper"
keyTab="/home/keytabs/zookeeper.service.keytab" principal="zookeeper/node3@EXAMPLE.COM";
};

以上在可以在node1 kafka客户端配置完成后,分发到node2~node3节点中,如下:

#在node1节点分发,分发后在node2、node3节点配置对应Server principal
[root@node1 config]# scp /software/kafka_2.12-3.3.1/config/kafka_jaas.conf node2:/software/kafka_2.12-3.3.1/config/
[root@node1 config]# scp /software/kafka_2.12-3.3.1/config/kafka_jaas.conf node3:/software/kafka_2.12-3.3.1/config/

注意:在node1~node3各个kafka节点中该文件中的KafkaServer对应的principal不同,为对应各个节点hostname。

4) 准备kafka_client_jaas.conf

在各个kafka节点配置kafka_client_jaas.conf配置文件,该文件作用主要是对kafka 客户端进行身份认证。这里在node1~node3节点KAFKA_HOME/config/中创建kafka_client_jaas.conf文件,内容如下:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 useKeyTab=true
 storeKey=true
 keyTab="/home/keytabs/kafka.service.keytab"
 serviceName="kafka"
 principal="kafka/node1@EXAMPLE.COM";
};

可以在node1节点配置该文件后分发到node2~node3节点中:

#分发到node2、node3节点,需要在对应节点配置对应的principal
[root@node1 config]# scp /software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf node2:/software/kafka_2.12-3.3.1/config/
[root@node1 config]# scp /software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf node3:/software/kafka_2.12-3.3.1/config/

以上文件分发完成后,需要在对应节点修改配置对应的Principal信息为对应的hostname。

5) 修改启动脚本kafka-server-start.sh

在kafka各个节点中配置KAFKA_HOME/bin/kafka-server-start.sh启动脚本,在该脚本中加入kafka_jaas.conf配置,加入的内容如下:

#在node1~node3各个节点都要配置kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/software/kafka_2.12-3.3.1/config/kafka_jaas.conf"

​​​​​​​​​​​​​​6) 修改kafka操作脚本

在当前集群中node1~node3节点是kafka服务端,同时如果在这3个节点上进行kafka 命令操作,这三个节点也是kafka客户端。在操作kafka时我们通常会操作KAFKA_HOME中的kafka-topic.sh、kafka-console-producer.sh、kafka-console-consumer.sh脚本,这些脚本需要进行kerberos认证,可以通过前面配置的kafka_client_jaas.conf文件进行Kerberos认证,所以这里在各个脚本中加入如下配置,避免在操作对应脚本时没有进行认证从而没有操作权限。

在node1~node3各个kafka 客户端配置以上脚本,可以先在node1节点进行配置各文件然后分发到其他kafka客户端,对应操作文件增加如下配置:

#vim /software/kafka_2.12-3.3.1/bin/kafka-topics.sh
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf"

#vim /software/kafka_2.12-3.3.1/bin/kafka-console-producer.sh
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf"

#vim /software/kafka_2.12-3.3.1/bin/kafka-console-consumer.sh
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf"

在node1 kafka 客户端配置完成后,将配置后的各个脚本文件分发到其他kafka客户端节点上:

[root@node1 ~]# cd /software/kafka_2.12-3.3.1/bin/
[root@node1 bin]# scp ./kafka-topics.sh ./kafka-console-producer.sh ./kafka-console-consumer.sh node2:`pwd`
[root@node1 bin]# scp ./kafka-topics.sh ./kafka-console-producer.sh ./kafka-console-consumer.sh node3:`pwd`

​​​​​​​​​​​​​​7) 准备client.properites

在node1~node3各个kafka 客户端中准备client.properties配置文件,该配置文件内容如下,这里将该文件创建在各个节点的/root目录下。

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

当Kafka通过Kerberos认证后,在执行KAFKA_HOME/bin目录下的脚本时,需要使用正确的协议、SASL机制及kerberos服务,所以这里将以上信息配置到client.properties文件中,在执行各个脚本时需要通过参数指定该文件,这样客户端可以和服务端正常通信。

在node1配置完成/root/client.properties文件后,分发到node2~node3节点中:

[root@node1 ~]# scp ./client.properties node2:`pwd`
[root@node1 ~]# scp ./client.properties node3:`pwd`

​​​​​​​8) 启动kafka集群

启动kafka集群前需要先启动Zookeeper,然后在各个kafka服务节点启动kafka,完成kafka集群启动。操作如下:

#node3~node5各节点启动zookeeper
zkServer.sh start

#node1~node3各节点启动kafka
startKafka.sh 

​​​​​​​2. 客户端操作Kafka

启动Kafka集群后,通过如下命令在Kafka集群查询、创建topic以及向topic中写入数据,以下命令可以执行在node1~node3各个kafka 客户端中。

#创建kafka topic 
kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --create --topic test --partitions 3 --replication-factor 1 --command-config  /root/client.properties

#查看集群topic
kafka-topics.sh --bootstrap-server node1:9092,node3:9092 --list --command-config /root/client.properties

#向kafka topic中写入数据
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092,node3:9092 --topic test --producer.config /root/client.properties 
>1
>2
>3
>4
>5

#读取kafka topic中的数据
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server node1:9092,node3:9092 --topic test --from-beginning  --consumer.config /root/client.properties 
1
2
3
4
5

​​​​​​​3. Java API操作Kafka

可以按照如下步骤实现Java API操作Kerberos认证的Kafka数据。

1) 准备krb5.conf文件

将node1 kerberos服务端/etc/krb5.conf文件存放在IDEA项目中的resources资源目录中或者本地Window固定的某个目录中,用于编写代码时指定访问Kerberos的Realm。

2) 准备用户keytab文件

在kerberos服务端node1节点上将生成的kafka.server.keytab文件存入到window路径中,这里放在项目resource资源目录下,后续需要该文件进行客户端认证。

3) 准备kafka_client_jaas.conf文件

将Kafka 中kafka_client_jaas.conf文件放在window中某个路径中,并修改该文件中keytab路径为window路径:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 useKeyTab=true
 storeKey=true
 keyTab="D:/idea_space/KerberosAuth/KerberosAuthKafka/src/main/resources/kafka.service.keytab"
 serviceName="kafka"
 principal="kafka/node1@EXAMPLE.COM";
};

特别需要注意的是该文件中指定window路径时使用“/”或“\\”隔开各目录,否则客户端认证时读取不到keytab文件。这里将修改后的kafka_client_jaas.conf文件存入到项目resource资源目录下。

4) 编写Java代码向Kafka topic中读写数据

编写代码前,需要在项目pom.xml中引入如下依赖:

<!-- kafka client依赖包 -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.3.1</version>
</dependency>

Java API读写Kafka Topic代码如下:

/**
 *  Java API 操作Kerbros认证的Kafka
 *  使用 JAAS 来进行 Kerberos 认证
 *  注意:kafka_client_jaas.conf文件中的keytab文件路径需要使用双斜杠或者反单斜杠
 */
public class OperateAuthKafka {
    public static void main(String[] args) {
        //准备JAAS配置文件路径
        String kafkaClientJaasFile = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\kafka_client_jaas.conf";
        // Kerberos配置文件路径
        String krb5FilePath = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\krb5.conf";

        System.setProperty("java.security.auth.login.config",kafkaClientJaasFile);
        System.setProperty("java.security.krb5.conf",krb5FilePath);

        Properties props = new Properties();
        props.setProperty("bootstrap.servers","node1:9092,node3:9092");
        props.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //kerberos安全认证
        props.setProperty("security.protocol","SASL_PLAINTEXT");
        props.setProperty("sasl.mechanism","GSSAPI");
        props.setProperty("sasl.kerberos.service.name","kafka");

        //向Kafka topic中发送消息
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(props);
        kafkaProducer.send(new ProducerRecord<>("test","100"));
        kafkaProducer.send(new ProducerRecord<>("test","200"));
        kafkaProducer.send(new ProducerRecord<>("test","300"));
        kafkaProducer.send(new ProducerRecord<>("test","400"));
        kafkaProducer.send(new ProducerRecord<>("test","500"));

        kafkaProducer.close();
        System.out.println("消息发送成功");

        /**
         * 从Kafka topic中消费消息
         */
        props.setProperty("group.id","test"+ UUID.randomUUID());
        //设置消费的位置,earliest表示从头开始消费,latest表示从最新的位置开始消费
        props.setProperty("auto.offset.reset","earliest");
        props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(Arrays.asList("test"));
        while (true) {
            // 拉取数据
            ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String,String> record : consumerRecords) {
                // 获取数据对应的分区号
                int partition = record.partition();
                // 对应数据值
                String value = record.value();
                //对应数据的偏移量
                long lastoffset = record.offset();
                //对应数据发送的key
                String key = record.key();

                System.out.println("数据的key为:"+ key +
                        ",数据的value为:" + value +
                        ",数据的offset为:"+ lastoffset +
                        ",数据的分区为:"+ partition);
            }
        }

    }
}

4. StructuredStreaming操作Kafka

StructuredStreaming操作Kafka时同样需要准备krb5.conf、kafka.service.keytab、kafka_client_jaas.conf配置文件,步骤参考Java API操作Kafka部分。

编写代码前,需要在项目pom.xml中引入如下依赖:

<!-- SparkSQL -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.12</artifactId>
  <version>3.4.0</version>
</dependency>
<!-- Kafka 0.10+ Source For Structured Streaming-->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
  <version>3.4.0</version>
</dependency>

StructuredStreaming操作Kafka代码如下:

/**
 * StructuredStreaming 读取Kerberos 认证的Kafka数据
 */
public class StructuredStreamingReadAuthKafka {
    public static void main(String[] args) throws TimeoutException,StreamingQueryException {
        //准备JAAS配置文件路径
        String kafkaClientJaasFile = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\kafka_client_jaas.conf";
        // Kerberos配置文件路径
        String krb5FilePath = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\krb5.conf";

        System.setProperty("java.security.auth.login.config",krb5FilePath);

        //1.创建对象
        SparkSession spark = SparkSession.builder()
                .master("local")
                .appName("kafka source")
                .config("spark.sql.shuffle.partitions",1)
                .getOrCreate();

        spark.sparkContext().setLogLevel("Error");

        //2.读取kafka 数据
        Dataset<Row> df = spark.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers",node3:9092")
                .option("subscribe","test")
                .option("startingOffsets","earliest")
                //kerberos安全认证
                .option("kafka.security.protocol","SASL_PLAINTEXT")
                .option("kafka.sasl.mechanism","GSSAPI")
                .option("kafka.sasl.kerberos.service.name","kafka")
                .load();

        Dataset<Row> result = df.selectExpr("cast (key as string)","cast (value as string)");

        StreamingQuery query = result.writeStream()
                .format("console")
                .start();

        query.awaitTermination();
    }
}

5. Flink 操作Kafka

Flink操作Kafka时同样需要准备krb5.conf、kafka.service.keytab、kafka_client_jaas.conf配置文件,步骤参考Java API操作Kafka部分。

编写代码前,需要在项目pom.xml中引入如下依赖:

<!-- Flink批和流开发依赖包 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients</artifactId>
  <version>1.16.0</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java</artifactId>
  <version>1.16.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>1.16.0</version>
</dependency>

Flink操作Kafka代码如下:

/**
 * Flink 读取Kerberos 认证的Kafka数据
 */
public class FlinkReadAuthKafka {
    public static void main(String[] args) throws Exception {
        //准备JAAS配置文件路径
        String kafkaClientJaasFile = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\kafka_client_jaas.conf";
        // Kerberos配置文件路径
        String krb5FilePath = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\krb5.conf";

        System.setProperty("java.security.auth.login.config",krb5FilePath);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<Tuple2<String,String>> kafkaSource = KafkaSource.<Tuple2<String,String>>builder()
                .setBootstrapServers("node1:9092,node3:9092") //设置Kafka 集群节点
                .setTopics("test") //设置读取的topic
                .setGroupId("my-test-group") //设置消费者组
                //kerberos安全认证
                .setProperty("security.protocol","SASL_PLAINTEXT")
                .setProperty("sasl.mechanism","GSSAPI")
                .setProperty("sasl.kerberos.service.name","kafka")
                
                .setStartingOffsets(OffsetsInitializer.earliest()) //设置读取数据位置
                .setDeserializer(new KafkaRecordDeserializationSchema<Tuple2<String,String>>() {
                    //设置key,value 数据获取后如何处理
                    @Override
                    public void deserialize(ConsumerRecord<byte[],byte[]> consumerRecord,Collector<Tuple2<String,String>> collector) throws IOException {
                        String key = null;
                        String value = null;
                        if(consumerRecord.key() != null){
                            key = new String(consumerRecord.key(),"UTF-8");
                        }
                        if(consumerRecord.value() != null){
                            value = new String(consumerRecord.value(),"UTF-8");
                        }
                        collector.collect(Tuple2.of(key,value));
                    }

                    //设置置返回的二元组类型
                    @Override
                    public TypeInformation<Tuple2<String,String>> getProducedType() {
                        return TypeInformation.of(new TypeHint<Tuple2<String,String>>() {
                        });
                    }
                })
                .build();

        DataStreamSource<Tuple2<String,String>> kafkaDS = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-source");

        kafkaDS.print();

        env.execute();
    }
}

欢迎点赞、评论、收藏,关注IT贫道,获取IT技术知识!

原文地址:https://blog.csdn.net/qq_32020645/article/details/131344973

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署