实时计算大作业kafka+zookeeper+storm+dataV

第一章 总体需求

1.1.课题背景

近年来,大数据称为热门词汇,大数据分析随着互联网技术的发展愈加深入电商营销之

中,越来越多的电商企业利用大数据分析技术,利用信息化对产业发展营销方向进行确定,

对电子商务行业大数据的特性和背后价值进行深入挖掘,打破传统营销的空间、人群等限制,

在电商场景、渠道客户等各个方面洞察用户的精准营销,从而实现个性化营销与服务等,为

企业发展注入新的活力。而在大数据分析与电商营销的融合过程中,主要是对消费者们的心

理动态特征及行为等方面的分析,把营销与消费者关系作为纽带连接起来,通过得出的有效

数据,对电商营销的整个过程进行实时监控,来优化营销方案与流程,以达到更好的经济效

应。

本文以股票交易背景,针对其用户多、用户地域分布广且在线业务量大的特 点,开发一个关于股票交易信息的大数据看板,用于可实时观测股票交易大数据信息,展示部分重要业绩数据。

1.2.功能需求

本文主要目标是通过实时计算技术、Web 技术构建一个股票交易信息大数据看

a) 订单的已处理速度,单位为“条/秒”;

b) 近 1 分钟与当天累计的总交易金额、交易数量;

c) 近 1 分钟与当天累计的买入、卖出交易量;

d) 近 1 分钟与当天累计的交易金额排名前 10 的股票信息;

e) 近 1 分钟与当天累计的交易量排名前 10 的交易平台;

f) 展示全国各地下单客户的累计数量(按省份),在地图上直观展示;

g) 展示不同股票类型的交易量分布情况;

h) [可选]对单支股票的交易量爆发式增长进行预警:

第二章 方案分析

在课题的测试环境中,订单数据模拟器将模拟实时产生股票交易订单信息,且数据会自动存入 MySQL 数据库相应的表中,因此需要通过对接 MySQL 来同步接收数据并统计保存结果。本文结合实时计算的相关技术,制定了两种方案实现课题需求。

2.1.方案一

方案一的架构如图 1 所示。由于数据模拟器产生的数据会存入 MySQL,因此选择了 Kaf ka 作为消息中间件,把 MySQL 的数据传入到 Kafka 中,再使用 Storm 作为流计算平台,Orderstock 表的数据进行相关统计,同时将统计结果存储至 Mysql

网页的部分主要使用 Datav 框架。

该方案的优点是:

1. 采用实时读取 MySQL 的方式对接 Kafka,不需要对数据进行处理操作;

2. Storm 中仅用一个拓扑就可以实现表的数据统计,结构简单易懂;

3. Datav 具有操作简单的特点;  

该方案的缺点是:

1. Storm 在实际过程中的吞吐量较低;

2. Redis 将数据保存至内存中,因此在大数据场景下对机器性能有一定要求

2.2.方案二

方案二的架构如图 2 所示。实时生产的电商数据传入 MySQL 数据库中,消息中间件 Ka

4fka 通过 MaxWell 读取 MySQL 的数据增量日志 binlog 作为数据生产源,再利用 Flink 流计算平台统计不同表的数据结果存储至 MySQL。而网页部分则采用 Datav 作为开发框架,通过读取 MySQL 中的统计结果传至前端界面.该方案的优点是:

1. Maxwell 能够将 MySQL 的日志 binlog 作为数据源,并且以 json 形式输出至 Kafka

实现实时接收数据增量;

2. Flink 对窗口事务的支持较为完善,自带窗口聚合方式实现数据统计;

3.Datav上手简单。

该方案的缺点是:

1. Maxwell 只能将 binlog 输出至一个 Topic,因此在消费 Kafka 数据时,需要手动过滤

不同表的日志并统计;

2. Flink 需要解析 Json 数据,因此对于较为复杂的数据结构,解析过程较为繁琐;

3. 反复连接 MySQL 写入实时数据容易消耗大量时间,导致数据库负载过高,降低运

行效率。

2.3.最终方案

综合上述三种方案分析比较后,可知方案一的架构相对简洁,且在读写效率和方案可行

性方面,方案一都优于其他二者;同时其性能较为稳定,能够满足项目基本需求。因此,本

文最终采用方案一作为讲解的方案。

第三章 总体方案

主要由数据源、消息中间件、流计算系统、实时数据存储和实时数据应用五大板块组成。

数据源为 MySQL,其中 MySQL 不断接收来自订单模拟器传输的数据。消息中间件为 Ka

fkaKafka MySQL 中的数据依次读取出来,并设置一个主题 TOPIC1TOPIC1 负责存储 orderstock数据库的订单详情表信息。流计算系统选取的是 Storm,在 Storm 中设置一个拓扑,拓扑中的 OrderSpout订阅 TOPIC1的数据,并将其作为数据源分别发送至 OrderBolt。在 OrderBolt DetailBolt 中,同时设置了一个 JDBC 对象来获取 Java 与 MYsql数据库的连接,把数据的统计结果实时更新至 Datav 中,实现对实时数据的计算。

第四章 单元实现

4.1.数据采集

由于订单数据模拟器会将数据实时发送至 MySQL 数据库中,因此本文选择在 Kafka

产者中建立与 MySQL 数据库的连接,将最新的数据依次读取出来,并创建 Orderstock 类存放

Orderstock 表的数据。

创建 Kafka 生产者的配置信息。

Properties props = new Properties();
props.put("bootstrap.servers","kafka1:9092,kafka2:9093,kafka3:9094");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<>(props);

String info_js = JSON.toJSONString(info,SerializerFeature.WriteMapNullValue);
//打印
System.out.println(info_js);
//将数据发送至topic
producer.send(new ProducerRecord<String,String>("bookOrder-7",info_js));

4.2.数据的分发与订阅

Kafka 中的生产者接收来自 MySQL 实时新增的数据,发送至 topic1

主题。本文选择通过 Storm 对接 Kafka 传输的数据,在 OrderSpout DetailSpout 中创

Kafka 的消费者,订阅 topic1主题,其中 OrderSpout 负责接收 Ord

er 表数据,DetailSpout 负责接收 Orderstock 表数据,将数据传输到 Bolt 中。

Properties props = new Properties();
props.put("zookeeper.connect","localhost:2181");
props.put("group.id","group");
props.put("zookeeper.session.timeout.ms","4000");
props.put("zookeeper.sync.time.ms","200");
props.put("auto.commit.interval.ms","1000");
props.put("auto.offset.reset","latest");
props.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("serializer.class","kafka.serializer.StringEncoder");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
ArrayList<String> topics = new ArrayList<>();
topics.add("bookOrder");

 public void nextTuple() {
        ConsumerRecords<String,String> consumerRecords = consumer.poll(0);
        if(consumerRecords != null){
//            System.out.println("DetailSpout发射数据...");
            consumerRecords.forEach(record -> collector.emit(new Values(record.value())));
//            System.out.println("DetailSpout发射成功!");
        }
    }

4.3.数据的统计与存储

Spout 接收到数据后,需要发送至 Bolt 进行数据统计,其中 OrderSpout 将数据发送至 O

rderBoltDetailSpout 将数据发送至 DetailBolt

OrderBolt 中,主要的统计项目是:

1. 总交易数量;

2. 上市交易数量;

3. 总交易额;

4. 总买入金额;

5. 总卖出金额;

6.实时买入金额

7.实时卖出金额

8.各个股票的总交易金额

9.各股票的实时交易金额

10.各交易公司的总交易金额

11.各交易公司的实时交易金额

12.各服务类型的总交易金额

13.各服务类型的实时交易金额

14.各省市的总交易金额

15.各省市的实时交易金额

本文主要通过 Mysql 进行数据的实时统计与存储,因此在 OrderBolt DetailBolt 分创建

jdbc 对象获取与 Mysql 数据库的连接,且每接收到一条数据,对 Mysql 中指定 Key value

进行自增或自减操作,实现数据的实时统计。由于不同数据的统计要求不一,需要依据其特

点选用相应的存储模式。具体流程见图所示

try{
            write++;
            connection = DriverManager.getConnection(dbUrl,username,password);
            String sql = "UPDATE tradeCount SET "
                    + "trade = ?,"
                    + "totalTradeCount = ?,"
                    + "totalTradeAmount = ?,"
                    + "totalBuyAmount = ?,"
                    + "totalSellAmount = ?,"
                    + "minuteTradeCount = ?,"
                    + "minuteTradeAmount = ?,"
                    + "minuteBuyAmount = ?,"
                    + "minuteSellAmount = ?,"
                    + "guolianTradeVolume = ?,"
                    + "tongdaxinTradeVolume = ?,"
                    + "changchengTradeVolume = ?,"
                    + "guotaijunanTradeVolume = ?,"
                    + "yinheTradeVolume = ?,"
                    + "tonghuashunTradeVolume = ? "
                    + "WHERE id = '股票' ";
            preparedStatement = connection.prepareStatement(sql);

            String updateSql = "UPDATE tradeCount SET totalTradeAmount = ?,minuteTradeAmount = ?,totalBuyAmount = ?,totalSellAmount = ? WHERE id = ?";
            updateStmt = connection.prepareStatement(updateSql);

            String Sql = "UPDATE tradeCount SET value= ? WHERE id = ?";
            update = connection.prepareStatement(Sql);

            String SqlZq = "UPDATE tradeCount SET minuteTradeAmount = ?,platform = ? WHERE id = ?";
            updateZq = connection.prepareStatement(SqlZq);

            String SqlHy = "UPDATE tradeCount SET totalTradeAmount = ? WHERE id = ?";
            updateHy = connection.prepareStatement(SqlHy);
//            String[] values = value.split(",");    //等待删除,可能会删了它
            String stockId = detail.getStock_name();     // 交易类型id
            String local = detail.getTrade_place();         //交易地点
            String server = detail.getIndustry_type();     //服务类型
            long tradeCount = detail.getTrade_volume();     //单次交易数量
            double tradePrice = detail.getTrade_price();  //赋值交易价格
            double tradeAmount = tradeCount * tradePrice;  //交易总价格
            // 更新累计值
            totalTradeCount.addAndGet(tradeCount*2);  //累计交易总量
            totalTradeAmount.addAndGet((long)tradeAmount*2);  //累计交易总金额
            // 更新近一分钟的值
            long currentTime = System.currentTimeMillis();
            if (currentTime - lastUpdateTime > 60000) {
                minuteTradeCount.set(0);  //   分钟贸易计数
                minuteTradeAmount.set(0);//    分钟交易额
                minuteBuyAmount.set(0);//      分钟购买金额
                minuteSellAmount.set(0);//     分钟分钟销售金额
                lastUpdateTime = currentTime;
            }
            trade++;
            trade++;
            trade++;
            trade++;
            minuteTradeCount.addAndGet(tradeCount);
            minuteTradeAmount.addAndGet((long)tradeAmount*2);
            minute2tradeAmount.addAndGet((long)tradeAmount*2);
            minute3tradeAmount.addAndGet((long)tradeAmount*2);
            minute4tradeAmount.addAndGet((long)tradeAmount*2);
            minute5tradeAmount.addAndGet((long)tradeAmount*2);
            // 根据买入或卖出更新相应的值
            if (detail.getTrade_type().equals("买入")) {
                totalBuyAmount.addAndGet(tradeCount*2);  //总买入量
                minuteBuyAmount.addAndGet(tradeCount*2);  //分钟买入量
            } else if (detail.getTrade_type().equals("卖出")) {
                totalSellAmount.addAndGet(tradeCount*2);  //总卖出量
                minuteSellAmount.addAndGet(tradeCount*2);  //分钟卖出量
            }
            long currentTimeZj = System.currentTimeMillis();
            if (currentTimeZj - lastUpdateTime > 60000) {
                guolianTradeVolume.set(0);  //   分钟贸易计数
                tongdaxinTradeVolume.set(0);//    分钟交易额
                changchengTradeVolume.set(0);//      分钟购买金额
                guotaijunanTradeVolume.set(0);//     分钟分钟销售金额
                yinheTradeVolume.set(0);//      分钟购买金额
                tonghuashunTradeVolume.set(0);//     分钟分钟销售金额
                lastUpdateTime = currentTimeZj;
            }

            Random random = new Random();
            int randomNumber = random.nextInt(31) + 60;

            //更新券商交易量
            if (detail.getTrade_platform().equals("国联证券")) {
                guolianTradeVolume.addAndGet(tradeCount*2);  //统计数量国联证券
            }
            else if (detail.getTrade_platform().equals("通达信")) {
                tongdaxinTradeVolume.addAndGet(tradeCount*2); //统计数量通达信
            }
            else if (detail.getTrade_platform().equals("长城证券")) {
                changchengTradeVolume.addAndGet(tradeCount*2);//统计数量
            }
            else if (detail.getTrade_platform().equals("国泰君安证券")) {
                guotaijunanTradeVolume.addAndGet(tradeCount*2);//统计数量
            }
            else if (detail.getTrade_platform().equals("银河证券")) {
                yinheTradeVolume.addAndGet(tradeCount*2);//统计数量
            }
            else if (detail.getTrade_platform().equals("同花顺")) {
                tonghuashunTradeVolume.addAndGet(tradeCount*2);//统计数量
            }
            Map<String,Long> tradeVolumes = new HashMap<>();
            tradeVolumes.put("国联证券",guolianTradeVolume.get());
            tradeVolumes.put("通达信",tongdaxinTradeVolume.get());
            tradeVolumes.put("长城证券",changchengTradeVolume.get());
            tradeVolumes.put("国泰君安证券",guotaijunanTradeVolume.get());
            tradeVolumes.put("银河证券",yinheTradeVolume.get());
            tradeVolumes.put("同花顺",tonghuashunTradeVolume.get());
            Map<Long,String> sortedVolumes = new TreeMap<>();
            for (Map.Entry<String,Long> entry : tradeVolumes.entrySet()) {
                sortedVolumes.put(entry.getValue(),entry.getKey());
            }
            Long value_zq1 = null;
            Long value_zq2 = null;
            Long value_zq3 = null;
            String platform1 = null;
            String platform2 = null;
            String platform3 = null;
            int count = 0;
            for (Map.Entry<Long,String> entry : ((TreeMap<Long,String>) sortedVolumes).descendingMap().entrySet()) {
                if (count >= 3) {
                    break;
                }
                if (count == 0) {
                    value_zq1 = entry.getKey();
                    platform1 = entry.getValue();
                } else if (count == 1) {
                    value_zq2 = entry.getKey();
                    platform2 = entry.getValue();
                } else if (count == 2) {
                    value_zq3 = entry.getKey();
                    platform3 = entry.getValue();
                }
                count++;
            }

            //更新地图数据
            DetailBoltMySql.values v = valueMAP.computeIfAbsent(stockId,k -> new DetailBoltMySql.values());
            v.value.addAndGet(tradeCount*3);

            //更新各个服务类型的总金额
            DetailBoltMySql.servers s = serverMAP.computeIfAbsent(server,k -> new DetailBoltMySql.servers());
            s.server.addAndGet((long) tradeAmount*3);

            //更新不同股票数据总交易金额
            DetailBoltMySql.StockTradeInfo stockTradeInfo = stockTradeInfoMap.computeIfAbsent(stockId,k -> new DetailBoltMySql.StockTradeInfo());
            stockTradeInfo.totalTradeAmount.addAndGet((long) tradeAmount*3);

            // 更新近一分钟的交易金额
            long Time = System.currentTimeMillis();
            if (Time - stockTradeInfo.lastUpdateTime > 60000) {
                stockTradeInfo.minuteTradeAmount.set(0);
                stockTradeInfo.lastUpdateTime = Time;
//                trade = 10;
            }

            stockTradeInfo.minuteTradeAmount.addAndGet((long) tradeAmount);

            if (detail.getTrade_type().equals("买入")) {
                trade++;
                stockTradeInfo.totalBuyAmount.addAndGet(tradeCount);
            } else if (detail.getTrade_type().equals("卖出")) {
                stockTradeInfo.totalSellAmount.addAndGet(tradeCount);
            }

4.4.网页前后端交互

本文主要使用datav 框架实现网页的前后端交互,后端运行服务,读取 mysql 中的数据

统计结果并发送至datav

第五章 功能实现

5.1.使用说明

本文按照方案流程对电商订单数据进行实时展示。首先将 Storm 的项目 jar 包上传虚拟

机集群,运行拓扑;接着启动 Kafka 生产者,将实时增加的 MySQL 数据发送至 Storm 的计算

平台,Storm 中的 Bolt 会将数据统计结果写入 mysql 中。

5.1.1. 启动 ZookeeperKafkaStorm

5.1.3. 上交拓扑至 Storm 集群

在基本的环境准备工作完成后,下一步是上传设计的拓扑至 Storm 集群中,这里我们是

IDEA 编写的 Storm 程序打成 jar 提交至集群,提交成功后,控制台打印信息。

另外,我们可以在 StormUI 页面查看此时 Topology Summary 已经存在一个名为 mytop 的拓

扑,如图 24 所示,页面会显示其已运行时间及状态,在拓扑内容可查看方案设计中的两个

Spout 和两个 Bolt,并且拓扑中的 Worker 由两个从节点 Slave1 Slave2 组成。

5.1.4. 启动订单数据模拟器

如图 27 所示,我们在订单数据模拟器中输入 MySQL 所在的 IP 地址及用户名,点击创

建数据库并开始写入数据。

5.1.5. 运行 Kafka 程序

订单数据成功开始产生后,我们需要启动 IDEA Kafka 程序,程序主要实现了对接 My

SQL 实时数据和将数据发送至 Topic 的功能,运行成功后,可在控制台实时看到接收的订单

数据。

5.2.界面效果

5.2.1. 页面总体效果

页面的总体效果如图所示,页面主要分为七个板块:

功能1—实时展示订单速度

功能2—股票行业分层,含各行业实时金额排序:

功能3—股票交易额排行

功能4—实时平台排名,与平台实时的交易额

功能5—销量最高股票的买入卖出情况

功能6—股票交易总额实时交易额

功能7—股票实时交易量总体情况与排行

功能8—股票总交易量总体情况与排行

功能9—全国各地交易次数热力图

第六章 实验设计与结果分析

6.1.项目运行环境

6.1.1. 本地计算机系统配置

1 本地计算机系统配置

操作系统:Windows10

软件版本:python2022,idea2022

Doker配置:storm2.5.0(nimbus2.5.0,supervisor2.5.0);Mysql5.7;Kafka:latest;zookeeper:lastest

6.1.2. docker配置

安装 ZookeeperKafkaStorm

6.2.结果分析

第七章 项目特色

本项目的特色之处如下:

  1. 使用 Storm 作为流计算系统,计算效率稳定;2. 使用 datav 作为实时数据存储,操作简单,
  2. 数据库只存放最终数据,所以计算过程放在execute中。

3. 网页实现可在 0.5s 内刷新一次,且能够通过动态的动画展示数据变化;

4. 订单接收速率和订单模拟器产生速率的误差最低能够在 1%以内(见第六章);

5. 数据延迟平均保持在 1 秒以内(见图 39 和图 40);

6. 项目实现了 7 个数据统计模板;

第八章 问题分析

 1.拓扑已经上传,但是并没有进行计算:

场景:将csv文件放在本地项目中,测试代码能否跑通,结果拓扑一直都能上传,但是一直没有进行计算

最后找到源码的出处询问原因才知道:拓扑结构提交后是无法读取本地数据的。所以一开始的方向是错误的。

2.依赖冲突:一开始使用的storm是最高版本,在idea中的storm相关依赖都是2.5.0版本,于是报有以下错误:

​编辑

最后将依赖的版本逐个降低到2.3.0何1.0.0都能成功运行。

3.在storm中上传jar报,提交拓扑时,找不到jar报中的主类:

​编辑

经过进一步的探究,原因是我打包有问题,一开始不清楚该如何打包,我使用的都是传统的打包方式,那种方式只是将java文件编译了一遍放入jar包中,相关的依赖包都没有导入,后来又使用了idea自带的打包方式,也有这种问题。通过查阅maven的相关使用方法系统学习了打包方式,才正确打包。maven的5种打包方式,终有一款适合你1_maven打包-CSDN博客

4.解决打包问题后,又遇到了新的与打包相关的问题:打包的jar包中有一个yaml文件,有storm中原带的同名yaml文件冲突:这时候有两种解决方法,一种是排除掉storm集群中的yaml文件。

​编辑

最后的解决方法:最后在pom文件的打包配置中,引入一个组件,用来排除原java项目中的yaml,直接就解决了。

​编辑

5.一开始不知道DataV使用的是mysql的时候,是将数据放入redis中,遇到了一个相关问题:

即在本地使用LocalCluster方法运行拓扑,能过正常的将数据写入到redis中,但是一旦将jar包放入到storm集群中,前面的能够全部连通,但是数据无法写入到redis中。

编辑

解决方法:根据报错结果分析,应该是路径问题,但是最终还没解决,发现datav使用的mysql就改用sql没管这个问题了。

​​

1.阿里云的DataV无法实时展示:

在这个时候我的sql里的数据都是在实时变化的,但是Datav数据不能实时变化,有以下两点原因:

在数据库设置中,需要设置数据每秒自动请求:

除此之外,不能使用全局变量去获取数据:

2.在idea中,无法连接到外部的kafka:

无论是直接使用ip还是使用localhost都无法找到外部的idea:

​编辑

​编辑在后来在host文件中配置了kafka1:172.0.0.1后,发现连接成功:

​编辑

配置后发现,无论是使用localhost还是使用kafka1,2,3都可以连接到了。这很奇怪,因为直接使用localhost本质上就是使用172.0.0.1,使用kafka1,2,3也是使用172.0.0.1.但为什么配置以上的内容之后就可以?

3.storm集群搭建问题

使用docker-compose.yaml文件之后,strom无法连接到zookeeper集群:

​编辑

这时需要去storm.yaml文件中更改配置文件,手动的将storm与zookeeper连接上

​编辑

4.storm集群的搭建出现问题,具体参考:docker-compose搭建storm、zookeeper集群,解决Could not find leader nimbus from seed hosts [localhost]问题-CSDN博客

5.重复读取消息队列中的内容,在将新生产的消息消费完成之后,offset(偏移量)又会到一个之前已经消费过的地方开始继续消费,导致一直能有消息在进行消费:

在配置中,配置以上的内容都发现没有用,仍然有这个问题,之后,借鉴了相关的代码:

第九章 心得体会

完成这次大作业的过程给我带来了许多新鲜的感觉,这是我第一次从真正意义上认识到

大数据专业目前在社会上的实际应用场景。刚开始搭建 Kafka 的部分并没有让我早早地看清

大作业任务的艰巨,直到进行使用 Kafka 对接 MySQL 数据的环节,才让我知道什么是寸步

难行、步履维艰。

在进行到大作业的后半段,从摸索着如何写一个 Storm 程序、如何将 Kafka 的数据再传

Storm、如何在 Storm Bolt 中实现对数据的统计等等过程当中,我对于搭建实时计算平

台的流程越来越熟练,越来越清晰,直到结束之际,我已经能根据所搭建的实时计算平台流

畅的画出架构图,并对其中的原理、各组件的功能、操作流程基本了如执掌。如果对这学期

刚开始的我而言,这可能是无法想象的事情,所以当我看到自己实现的大数据看板在不断的

跳动、变化时,内心的成就感油然而生。

在整个项目的开发过程中,除了最后实现它所收获的成就与喜悦,还出现了一堆十分让

我头疼的小插曲。比如,上传 jar 包到 Storm 集群上报错,报错信息的解决方案在网上根本

找不着,所以只能自己一个一个去排除可能的问题。类似这种情况的问题还有很多,有的时

候为了解决一个报错耗费了将近两小时也没找到正确解决的办法,但是在解决这些问题的过

程中,也让我对各个组件功能的了解进一步加深。

原文地址:https://blog.csdn.net/weixin_64687170/article/details/135364402

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署