storm连接kafka,storm整合mybatis并连接mysql

课题题目:

1. 课题背景

某股票交易机构已上线一个在线交易平台,平台注册用户量近千万,每日均 接受来自全国各地的分支机构用户提交的交易请求。鉴于公司发展及平台管理要 求,拟委托开发一个在线实时大数据系统,可实时观测股票交易大数据信息,展 示部分重要业绩数据。

2. 数据源

为提供更真实的测试环境,公司的技术部门委托相关人员已设计了一个股票 交易数据模拟器,可模拟产生客户在平台中下单的信息,数据会自动存入指定文 件夹中的文本文件。 该模拟器允许调节进程的数量,模拟不同量级的并发量,以充分测试系统的 性能。数据的具体字段说明详见下表:

(1) 数据字段说明

序号 字段名 中文含义 备注

1 stock_name 股票名

2 stock_code 股票代码

3 time 交易时间 时间戳

4 trade_volume 交易量 数值型

5 trade_price 交易单价 数值类型

6 trade_type 交易类型 [买入,卖出]

7 trade_place 交易地点 交易所在省份

8 trade_platform 交易平台 该交易发生的分支交易平台

9 industry_type 股票类型 该股票所在行业

(2) 数据模拟器使用说明

该模拟器能够模拟用户在平台中下单的行为,并且能够生成包括股票代码、收盘 价格、交易量、交易时间等详细信息的模拟交易数据。模拟器能够生成指定条股 票数据,生成的数据会自动存储到指定文件夹中,方便后续的数据分析和测试验 证。

⚫ 文件夹中包含“place.json”和“stock.exe” “place.json”是地址生成文件,必须与“stock.exe”在同一文件夹下。

⚫ 运行“stock.exe”,初始默认股票数目为 10 支

(3) 任务要求:

a) 订单的已处理速度,单位为“条/秒”;

b) 近 1 分钟与当天累计的总交易金额、交易数量;

c) 近 1 分钟与当天累计的买入、卖出交易量;

d) 近 1 分钟与当天累计的交易金额排名前 10 的股票信息;

e) 近 1 分钟与当天累计的交易量排名前 10 的交易平台;

f) 展示全国各地下单客户的累计数量(按省份),在地图上直观展示;

g) 展示不同股票类型的交易量分布情况;

环境:

在三台CentOS虚拟机中配置好docker,并使用docker配置好zookeeper集群、kafka集群、storm集群并且能正常运行,kafka集群和storm集群能正常连接虚拟机外部的主机。

依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>KafkaStormMySQL</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.5.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.4</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.5.13</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-jdbc</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>5.0.1</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

storm连接kafka

1.创建并配置Kafka消费者

定义一个名为 MyKafkaConsumer 的类,用于连接到 Kafka 集群并从指定的主题中读取数据。

package org.storm.spout;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyKafkaConsumer {
    private KafkaConsumer<String,String> consumer;

    public MyKafkaConsumer(String bootstrapServers,String groupId,String topic) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topic)); // 订阅主题
    }

    public void subscribe(String topic) {
        consumer.subscribe(Collections.singletonList(topic));
    }

    public ConsumerRecords<String,String> poll() {
        return consumer.poll(Duration.ofMillis(1000));
    }

    public void close() {
        consumer.close();
    }
}

2.创建Spout,从kafka中读取数据并发送到storm的拓扑中

定义一个名为 StockTransactionSpout 的 Spout,用于从 Kafka 中读取股票交易数据并将其发送到 Storm 的拓扑中。

其中的BOOTSTRAP_SERVERS要根据自己的虚拟机ip设置

package org.storm.spout;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Map;

public class StockTransactionSpout implements IRichSpout {
    private SpoutOutputCollector collector;
    private MyKafkaConsumer myKafkaConsumer;


    private static final String BOOTSTRAP_SERVERS = "192.168.10.102:9092,192.168.10.103:9092,192.168.10.104:9092";
    private static final String TOPIC_NAME = "stock-data";
    private static final String GROUP_ID = "stock-transaction-group";

    @Override
    public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
        this.collector = collector;
        this.myKafkaConsumer = new MyKafkaConsumer(BOOTSTRAP_SERVERS,GROUP_ID,TOPIC_NAME);
        // Initialize your KafkaConsumer here
    }

    @Override
    public void nextTuple() {
        ConsumerRecords<String,String> records = myKafkaConsumer.poll();
        for (ConsumerRecord<String,String> record : records) {
            String[] fields = record.value().split(",");

            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date parsedDate;
            try {
                parsedDate = dateFormat.parse(fields[0]);
            } catch (ParseException e) {
                e.printStackTrace();
                continue;  // 如果日期解析失败,跳过这条记录
            }

            Timestamp time = new java.sql.Timestamp(parsedDate.getTime());
            String stockCode = fields[1];
            String stockName = fields[2];
            double tradePrice = Double.parseDouble(fields[3]);
            int tradeVolume = Integer.parseInt(fields[4]);
            String tradeType = fields[5];
            String tradePlace = fields[6];
            String tradePlatform = fields[7];
            String industryType = fields[8];

            Values values = new Values(stockName,stockCode,time,tradeVolume,tradePrice,tradeType,tradePlace,tradePlatform,industryType);
            collector.emit(values);
        }
    }

    @Override
    public void ack(Object o) {

    }

    @Override
    public void fail(Object o) {

    }

    // Implement other necessary methods...

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("stockName","stockCode","time","tradeVolume","tradePrice","tradeType","tradePlace","tradePlatform","industryType"));
    }

    @Override
    public Map<String,Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void close() {
        // 在这里添加关闭资源的代码
        // 例如,如果您在Spout中打开了一些资源(如数据库连接、文件等),那么应该在这里关闭它们
        if (myKafkaConsumer != null) {
            myKafkaConsumer.close();
        }
    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }
}

storm整合mybatis并连接mysql

因篇幅限制,这里仅展示一个bolt,即统计不同股票类型的交易量分布情况的bolt的具体结构

1.定义实体类

package org.storm.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@AllArgsConstructor
@NoArgsConstructor
public class IndustryTradeVolume {
    private String industry_type;
    private int trade_volume;

}

2.定义mapper类

package org.storm.mapper;

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.storm.pojo.IndustryTradeVolume;

import java.util.List;

//@Mapper
public interface IndustryTradeVolumeMapper {
    void insertAllIndustryTradeVolumes(@Param("industry_type") String industry_type,@Param("trade_volume") int trade_volume);

}

3.定义Service类

package org.storm.service;

import org.storm.pojo.IndustryTradeVolume;
import java.util.List;

public interface IndustryTradeVolumeService {
    void insertAllIndustryTradeVolumes(String industry_type,int trade_volume);
}
package org.storm.service.Impl;

import org.storm.mapper.IndustryTradeVolumeMapper;
import org.storm.pojo.IndustryTradeVolume;
import org.storm.service.IndustryTradeVolumeService;

import java.util.List;

public class IndustryTradeVolumeServiceImpl implements IndustryTradeVolumeService {

    private IndustryTradeVolumeMapper industryVolumeMapper;
    @Override
    public void insertAllIndustryTradeVolumes(String industry_type,int trade_volume){
        industryVolumeMapper.insertAllIndustryTradeVolumes(industry_type,trade_volume);
    }
}

4.MyBatis映射

每个mapper均需要MyBatis映射

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.storm.mapper.IndustryTradeVolumeMapper">
    <!--统计不同股票类型的交易量分布情况-->
    <select id="insertAllIndustryTradeVolumes">
        INSERT INTO industry_volume (`industry_type`,`trade_volume`) VALUES (#{industry_type},#{trade_volume}) ON DUPLICATE KEY UPDATE trade_volume = trade_volume + VALUES(`trade_volume`)
    </select>
</mapper>

5.MyBatis配置文件

修改数据库、用户名及密码

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <environments default="mysql">
        <environment id="mysql">
            <!--配置事务管理器 -->
            <transactionManager type="JDBC"></transactionManager>
            <!-- 配置数据源-->
            <dataSource type="POOLED">
                <property name="driver" value="com.mysql.cj.jdbc.Driver"/>
                <property name="url" value="jdbc:mysql://localhost:3306/stock_db?useSSL=false&amp;useUnicode=true&amp;characterEncoding=utf8&amp;
                          serverTimezone=Asia/Shanghai"/>
                <property name="username" value="root"/>
                <property name="password" value="1221005a"/>
            </dataSource>
        </environment>
    </environments>

    <!--指明映射文件的位置。-->
    <mappers>
        <mapper resource="mapper/IndustryVolumeMapper.xml"></mapper>
        <mapper resource="mapper/OrderSpeedMapper.xml"></mapper>
        <mapper resource="mapper/ProvinceCustomerCountMapper.xml"></mapper>
        <mapper resource="mapper/PlatformVolumeMapper.xml"></mapper>
        <mapper resource="mapper/TransactionAmountMapper.xml"></mapper>
        <mapper resource="mapper/TradeInfoMapper.xml"></mapper>
        <mapper resource="mapper/TradeVolumeMapper.xml"></mapper>
        <mapper resource="mapper/TransactionInfoMapper.xml"></mapper>
    </mappers>

</configuration>

6.创建SqlSessionFactoryBuilder类

package org.storm.config;

import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;

import java.io.IOException;
import java.io.Reader;

public class SqlSessionConfig {
    public static SqlSessionFactory GetConn() {
        Reader reader = null;
        try {
            reader = Resources.getResourceAsReader("mybatis.xml");
            return new SqlSessionFactoryBuilder().build(reader);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

7.创建bolt类

package org.storm.bolt;

import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.storm.config.SqlSessionConfig;
import org.storm.mapper.IndustryTradeVolumeMapper;
import org.storm.mapper.TradeVolumeMapper;
import org.storm.service.IndustryTradeVolumeService;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import java.util.HashMap;
import java.sql.PreparedStatement;


//统计不同股票类型的交易量分布情况
public class IndustryTradeVolumeBolt extends BaseRichBolt {
    private OutputCollector collector;

    IndustryTradeVolumeService industryTradeVolumeService;
    IndustryTradeVolumeMapper industryTradeVolumeMapper;

    @Override
    public void prepare(Map<String,Object> topoConf,OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String industryType = input.getStringByField("industryType");
        int tradeVolume = input.getIntegerByField("tradeVolume");

        SqlSessionFactory factory = SqlSessionConfig.GetConn();
        SqlSession session = factory.openSession();
        IndustryTradeVolumeMapper db = session.getMapper(IndustryTradeVolumeMapper.class);
        db.insertAllIndustryTradeVolumes(industryType,tradeVolume);
        session.commit();
        session.close();

//        industryTradeVolumeMapper.insertAllIndustryTradeVolumes(industryType,tradeVolume);
        // 确认处理成功
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 本例中不输出任何字段
    }
}

8.创建Topology类

package org.storm;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.utils.Utils;
import org.storm.bolt.*;
import org.storm.spout.StockTransactionSpout;


public class StockTransactionTopology {

    public static void main(String[] args) throws Exception {

        // 创建TopologyBuilder实例
        final TopologyBuilder builder = new TopologyBuilder();

        // 设置Spout
        builder.setSpout("kafka_spout",new StockTransactionSpout(),1).setNumTasks(1);

        // 设置Bolts
        builder.setBolt("order-speed-bolt",new OrderSpeedBolt(),1).shuffleGrouping("kafka_spout");
        builder.setBolt("transaction-statistics-bolt",new TransactionStatisticsBolt(),1).shuffleGrouping("kafka_spout");
        builder.setBolt("trade-volume-bolt",new TradeVolumeBolt(),1).shuffleGrouping("kafka_spout");
        builder.setBolt("province-customer-count-bolt",new ProvinceCustomerCountBolt(),1).shuffleGrouping("kafka_spout");
        builder.setBolt("industry-trade-volume-bolt",new IndustryTradeVolumeBolt(),1).shuffleGrouping("kafka_spout");
        builder.setBolt("top-platforms-bolt",new TopPlatformsBolt(),1).shuffleGrouping("kafka_spout");
        builder.setBolt("top-transactions-bolt",new TopTransactionsBolt(),1).shuffleGrouping("kafka_spout");
        builder.setBolt("trade-info-bolt",new TradeInfoBolt(),1).shuffleGrouping("kafka_spout");
        Config conf = new Config();
        conf.setDebug(true);

//        // 提交Topology
//        StormSubmitter.submitTopology("stock-transaction-topology",conf,builder.createTopology());

        //        本地测试
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("myTopology",builder.createTopology());


//        // 运行10分钟后停止
        Utils.sleep(600000);
        cluster.killTopology("myTopology");
        cluster.shutdown();
    }

    private static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(String bootstrapServers,String topic) {
        return KafkaSpoutConfig.builder(bootstrapServers,topic)
                // 除了分组ID,以下配置都是可选的。分组ID必须指定,否则会抛出InvalidGroupIdException异常
                .setProp(ConsumerConfig.GROUP_ID_CONFIG,"stock-transaction-group")
                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,200)
                // 定义重试策略
                .setRetry(getRetryService())
                // 定时提交偏移量的时间间隔,默认是15s
                .setOffsetCommitPeriodMs(10000)
                .setMaxUncommittedOffsets(200)
                .build();
    }

    // 定义重试策略
    private static KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),Integer.MAX_VALUE,KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
    }
}

代码完成后,本地测试直接运行即可,若要打包上传到虚拟机需注释掉:

        //        本地测试
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("myTopology",builder.createTopology());

并取消注释这段代码:

//        // 提交Topology
//        StormSubmitter.submitTopology("stock-transaction-topology",builder.createTopology());

原文地址:https://blog.csdn.net/Huailizhi/article/details/135017561

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署