ClickHouse For Kafka

为使用ClickHouse 消费Kafka 实时数据的同学提供一些参考

一 架构流程图:

可以看到ClickHouse 内置Kafka 消费引擎,不需要我们业务方写新的消费程序,再往ClickHouse 导入数据

二 前提条件:

  • 已创建Kafka集群,且在生产数据
  • 已创建云数据库 CDW-ClickHouse集群

三 使用限制:

Kafka集群和ClickHouse集群需要在同一VPC下。

四 操作步骤:

这里忽略Kafka 集群本身的一些操作,以上三个步骤是可以调整顺序的

  1. Kafka Table Engine: 在ClickHouse 内部创建Kafka消费表(这里可以理解为 消费了一部分Kafka 表的一个队列,存储消费Kafka Topic的一部分数据)
  2. MergeTree Table Engine: 在ClickHouse 内部创建 Kafka 数据存储表
  3. MATERIALIZED Table: 在ClickHouse 内部创建 Kafka 消费表,这里可以理解为它是一个搬运者,将 Kafka Table Engine 挪到 MergeTree Table Engine

五 操作步骤详解:

1 Kafka Table Engine

CREATE TABLE IF NOT EXISTS data_sync.test_queue(
  name String,
  age int,
  gongzhonghao String,
  my_time DateTime64(3, 'UTC')
) ENGINE = Kafka
SETTINGS
kafka_broker_list = '172.16.16.4:9092',
kafka_topic_list = 'lemonCode',
kafka_group_name = 'lemonNan',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
kafka_schema = '',
kafka_num_consumers = 1

名称

是否必选

说明

kafka_broker_list

Kafka 服务的 broker 列表,用逗号分隔,这里建议用 Ip:port, 不要用域名(可能存在 DNS 解析问题)。

kafka_topic_list

Kafka topic,多个 topic 用逗号分隔。

kafka_group_name

Kafka 的消费组名称。

kafka_format

Kafka 数据格式, ClickHouse 支持的 Format, 详见 文档 可选参数。

kafka_row_delimiter

行分隔符,用于分割不同的数据行。默认为“\n”,您也可以根据数据写入的实际分割格式进行设置。

kafka_num_consumers

单个 Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应 topic 的 partitions 总数。

kafka_max_block_size

Kafka 数据写入目标表的 Block 大小,超过该数值后,就将数据刷盘;单位:Byte,默认值为65536 Byte。

kafka_skip_broken_messages

表示忽略解析异常的 Kafka 数据的条数。如果出现了 N 条异常后,后台线程结束 默认值为0。

kafka_commit_every_batch

执行 Kafka commit 的频率,取值如下: 0:完全写入一整个Block数据块的数据后才执行commit; 1:每写完一个Batch批次的数据就执行一次commit。

kafka_auto_offset_reset

从哪个 offset 开始读取 Kafka 数据。取值范围:earlist,latest。

2 MergeTree Table Engine

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

3 MATERIABLIZED Table Engine

CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
SELECT *
FROM kafka.consumer;

六 如何维护

1 停止消费Kafka 数据

DETACH TABLE cppla.kafka_readings_view ;

2 恢复消费Kafka 数据

ATTACH TABLE cppla.kafka_readings_view ;

七 分布式写入

1 写入ClickHouse 分布式表

2 Kafka Engine 消费不同分区

八 数据高可用方案

1 ClickHouse ReplicateMergeTree 内部机制保证:

2 ClickHouse 双写保证

九 更新

采用 MergeTree + MATERIALIZED + AggregatingMergeTree

1 MergeTree

CREATE TABLE base
(
 `i` Int64,
 `s` String,
 `v` DateTime64,
 `id` Int64 DEFAULT 0,
 `tag` String DEFAULT '',
 `level` Int32 DEFAULT 0
)
ENGINE = MergeTree 
PARTITION BY (i % 256)
ORDER BY (i, s)

2 Aggregating MergeTree

CREATE TABLE updating
(
 `i` Int64,
 `s` String,
 `version` AggregateFunction(max, DateTime64),
 `id` AggregateFunction(argMaxIf, Int64, DateTime64, UInt8),
 `tag` AggregateFunction(argMaxIf, String, DateTime64, UInt8),
 `level` AggregateFunction(argMaxIf, Int32, DateTime64, UInt8)
)
ENGINE = AggregatingMergeTree
PARTITION BY (i % 256)
ORDER BY (i, s)

3 MATERIALIZED

CREATE MATERIALIZED VIEW mv TO updating AS
SELECT
 i,
 s,
 maxState(v) AS version,
 argMaxIfState(id, v, id != 0) AS id,
 argMaxIfState(tag, v, tag != '') AS tag,
 argMaxIfState(level, v, level != 0) AS level
FROM base
GROUP BY (i, s)

4 查询

SELECT
 i,
 s,
 maxMerge(version),
 argMaxIfMerge(id),
 argMaxIfMerge(tag),
 argMaxIfMerge(level)
FROM updating
GROUP BY
 i,
 s

希望对阅读的您有所帮助

原文地址:https://cloud.tencent.com/developer/article/2025724

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340