2、Kafka 生产者

3.1 生产者消息发送流程
3.1.1 发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程
中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,
Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

在这里插入图片描述


3.1.2 生产者重要参数列表

在这里插入图片描述


3.2 异步发送 API
3.2.1 普通异步发送
1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker

在这里插入图片描述


2)代码编写
(1)创建工程 kafka
(2)导入依赖

<dependencies>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>3.0.0</version>
 </dependency>
</dependencies>

(3)创建包名:com.atguigu.kafka.producer
(4)编写不带回调函数的 API 代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {
 public static void main(String[] args) throws 
InterruptedException {
 // 1. 创建 kafka 生产者的配置对象
 Properties properties = new Properties();
 // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");
 
 // key,value 序列化(必须):key.serializer,value.serializer
 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
 
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
 // 3. 创建 kafka 生产者对象
 KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);
 // 4. 调用 send 方法,发送消息
 for (int i = 0; i < 5; i++) {
 kafkaProducer.send(new 
ProducerRecord<>("first","atguigu " + i));
 }
 // 5. 关闭资源
 kafkaProducer.close();
 }
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

3.2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发
送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {
 public static void main(String[] args) throws 
InterruptedException {
 // 1. 创建 kafka 生产者的配置对象
 Properties properties = new Properties();
 // 2. 给 kafka 配置对象添加配置信息
 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");
 // key, 
StringSerializer.class.getName());
 
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
 // 3. 创建 kafka 生产者对象
 KafkaProducer<String,发送消息
 for (int i = 0; i < 5; i++) {
 // 添加回调
 kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用
 @Override
 public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 if (exception == null) {
 // 没有异常,输出信息到控制台
 System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());
 } else {
 // 出现异常打印
 exception.printStackTrace();
 }
 }
 });
 // 延迟一会会看到数据发往不同分区
 Thread.sleep(2);
 }
 // 5. 关闭资源
 kafkaProducer.close();
 }
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

③在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

3.3 同步发送 API
只需在异步发送的基础上,再调用一下 get()方法即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducerSync {
 public static void main(String[] args) throws
InterruptedException, ExecutionException {
 // 1. 创建 kafka 生产者的配置对象
 Properties properties = new Properties();
 // 2. 给 kafka 配置对象添加配置信息
 
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");
 // key, String>(properties);
// 4. 调用 send 方法,发送消息
 for (int i = 0; i < 10; i++) {
 // 异步发送 默认
// kafkaProducer.send(new 
ProducerRecord<>("first","kafka" + i));
 // 同步发送
 kafkaProducer.send(new 
ProducerRecord<>("first","kafka" + i)).get();
 }
 // 5. 关闭资源
 kafkaProducer.close();
 }
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

3.4 生产者分区
3.4.1 分区好处

在这里插入图片描述


3.4.2 生产者发送消息的分区策略
1)默认的分区器 DefaultPartitioner
在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record,use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
 … …
}

在这里插入图片描述


2)案例一
将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {
 public static void main(String[] args) {
 // 1. 创建 kafka 生产者的配置对象
 Properties properties = new Properties();
 // 2. 给 kafka 配置对象添加配置信息
 
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
StringSerializer.class.getName());
 KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);
 for (int i = 0; i < 5; i++) {
 // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
 kafkaProducer.send(new ProducerRecord<>("first", 
1,"","prince " + i), new Callback() {
 @Override
 public void onCompletion(RecordMetadata metadata, 
Exception e) {
 if (e == null){
 System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition()
 );
 }else {
 e.printStackTrace();
 }
 }
 });
 }
 kafkaProducer.close();
 }
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

③在 IDEA 控制台观察回调信息。

主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

3)案例二
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取
余得到 partition 值。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {
 public static void main(String[] args) {
 Properties properties = new Properties();
 
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");
 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, String> kafkaProducer = new 
KafkaProducer<>(properties);
 for (int i = 0; i < 5; i++) {
 // 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,
分别发往 120
 kafkaProducer.send(new ProducerRecord<>("first", 
"a", 
Exception e) {
 if (e == null){
 System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition()
 );
 }else {
 e.printStackTrace();
 }
 }
 });
 }
 kafkaProducer.close();
 }
}

测试:
①key="a"时,在控制台查看结果。

主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

②key="b"时,在控制台查看结果。

主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2

③key="f"时,在控制台查看结果。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

3.4.3 自定义分区器
如果研发人员可以根据企业需求,自己重新实现分区器。
1)需求
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,
不包含 atguigu,就发往 1 号分区。
2)实现步骤
(1)定义类实现 Partitioner 接口。
(2)重写 partition()方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
 /
 * 返回信息对应的分区
 * @param topic 主题
 * @param key 消息的 key
 * @param keyBytes 消息的 key 序列化后的字节数组
 * @param value 消息的 value
 * @param valueBytes 消息的 value 序列化后的字节数组
 * @param cluster 集群元数据可以查看分区信息
 * @return
 */
 @Override
 public int partition(String topic, Object key, byte[] 
keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
 // 获取消息
 String msgValue = value.toString();
 // 创建 partition
 int partition;
 // 判断消息是否包含 atguigu
 if (msgValue.contains("atguigu")){
 partition = 0;
 }else {
 partition = 1;
 }
 // 返回分区号
 return partition;
 }
 // 关闭资源
 @Override
 public void close() {
 }
 // 配置方法
 @Override
 public void configure(Map<String, ?> configs) {
 }
}

(3)使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {
 public static void main(String[] args) throws 
InterruptedException {
Properties properties = new Properties();
 
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
StringSerializer.class.getName());
 // 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui
gu.kafka.producer.MyPartitioner");
 KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);
 for (int i = 0; i < 5; i++) {
 
 kafkaProducer.send(new ProducerRecord<>("first", 
Exception e) {
 if (e == null){
 System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition()
 );
 }else {
 e.printStackTrace();
 }
 }
 });
 }
 kafkaProducer.close();
 }
}

(4)测试
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

3.5 生产经验——生产者如何提高吞吐量

在这里插入图片描述

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerParameters {
 public static void main(String[] args) throws 
InterruptedException {
 // 1. 创建 kafka 生产者的配置对象
 Properties properties = new Properties();
 // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
 // batch.size:批次大小,默认 16K
 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 // linger.ms:等待时间,默认 0
 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
 33554432);
 // compression.type:压缩,默认 none,可配置值 gzip、snappy、
lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
 // 3. 创建 kafka 生产者对象
 KafkaProducer<String,"prince " + i));
 }
 // 5. 关闭资源
 kafkaProducer.close();
 }
} 

测试
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

3.6 生产经验——数据可靠性
1)ack 应答原理

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


2)代码配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {
 public static void main(String[] args) throws 
InterruptedException {
 // 1. 创建 kafka 生产者的配置对象
 Properties properties = new Properties();
 // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
 102:9092");
 
 // key,
StringSerializer.class.getName());
 
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
 // 设置 acks
 properties.put(ProducerConfig.ACKS_CONFIG, "all");
 // 重试次数 retries,默认是 int 最大值,2147483647
 properties.put(ProducerConfig.RETRIES_CONFIG, 3);
 // 3. 创建 kafka 生产者对象
 KafkaProducer<String,"prince " + i));
 }
 // 5. 关闭资源
 kafkaProducer.close();
 }
}

3.7 生产经验——数据去重
3.7.1 数据传递语义

在这里插入图片描述


3.7.2 幂等性
1)幂等性原理

在这里插入图片描述


2)如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭。

3.7.3 生产者事务
1)Kafka 事务原理

在这里插入图片描述


2)Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

3)单个 Producer,使用事务保证消息的仅一次发送

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {
 public static void main(String[] args) throws 
InterruptedException {
 // 1. 创建 kafka 生产者的配置对象
 Properties properties = new Properties();
 // 2. 给 kafka 配置对象添加配置信息
 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
 "hadoop102:9092");
 // key,value 序列化
 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
 // 设置事务 id(必须),事务 id 任意起名
 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"transaction_id_0");
 // 3. 创建 kafka 生产者对象
 KafkaProducer<String, String>(properties);
 // 初始化事务
 kafkaProducer.initTransactions();
 // 开启事务
 kafkaProducer.beginTransaction();
 try {
 // 4. 调用 send 方法,发送消息
 for (int i = 0; i < 5; i++) {
 // 发送消息
 kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i));
 }
// int i = 1 / 0;
 // 提交事务
 kafkaProducer.commitTransaction();
 } catch (Exception e) {
 // 终止事务
 kafkaProducer.abortTransaction();
 } finally {
 // 5. 关闭资源
 kafkaProducer.close();
 }
 }
}

3.8 生产经验——数据有序

在这里插入图片描述


3.9 生产经验——数据乱序

在这里插入图片描述

原文地址:https://blog.csdn.net/weixin_45817985/article/details/133939024

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署