基于华为MRS3.2.0实时Flink消费Kafka落盘至HDFS的Hive外部表的调度方案


该需求为实时接收对手Topic,并进行消费落盘至Hive。

在具体的实施中,基于华为MRS 3.2.0安全模式带kerberos认证的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,调度平台为开源dolphinscheduler。

在这里插入图片描述

本需求的完成全部参考华为官方MRS3.2.0开发文档,相关章节是普通版的安全模式。

华为官方文档:https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_1031.html

1 Kafka

1.1 Kerberos安全模式的认证与环境准备

着手开发前,需要将FushionInsight租户加入kafkaadmin组,保证有创建主题和消费主题的权限,在得到此权限时,切勿对集群中的主题进行危险操作。

保证租户权限后,开始准备开发环境。该步骤需要安装Idea客户端在windows本地,同时安装兼容的maven版本,华为MRS需要安装至少OpenJDK 1.8.0_332的版本。

运行环境的配置则需要在FushionInsight的web管理界面下载kafka的完整客户端,包括config配置文件也需要下载。另外windows本地的hosts文件中要和FushionInsight中的集群地址有映射,可手动添加,同时应保证本地和集群能ping通。

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130006.html

1.2 创建一个测试主题

在Linux环境中执行:

bin/kafka-topics.sh --create --bootstrap-server <Kafka集群IP:21007> --command-config config/client.properties --partitions 1 --replication-factor 1 --topic testTopic

创建一个测试testTopic,创建成功后,FushionInsight的web界面会报topic只有一个分区副本的警告,请忽略它。

同时也可以开启两个新的终端窗口用于测试生产者和消费者:

  1. bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名称> --producer.config config/producer.properties
  2. bin/kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21007> --consumer.config config/consumer.properties

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130031.html

1.3 消费主题的接收测试

通过以下网站下载华为MRS所需的样例代码:

https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0

下载样例代码之后需要在华为镜像站下载代码所需依赖,华为MRS所需的组件依赖不同于apache的开源版本,需要单独配置maven的setting文件华为中央仓库进行下载,在开发时,组件相关的依赖都需要用下载华为的。

镜像地址:

https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/

华为开源镜像站:

https://mirrors.huaweicloud.com/home

完成依赖和样例代码项目创建即可开发,在开发程序时,需要将用于安全认证的keytab文件即“user.keytab”和“krb5.conf”文件以及config所有配置文件放置在样例工程的“kafka-examples\src\main\resources”目录下。

在开发时,这些安全认证只需要生成一个jaas文件并设置相关环境变量即可。华为提供了LoginUtil相关接口来完成这些配置,样例代码中只需要配置用户自己租户名称和对应的keytab文件名称即可。

创建生产测试时,首先需要修改KafkaProperties类中的生产主题名,接下来在com.huawei.bigdata.kafka.example.Producer类中修改租户账号,keytab位置即可,运行成功后,会向主题推送100条测试数据,此时,我们在1.2小节中开启的消费者窗口就能接受到生产的数据。

在具体的测试中,需要控制消息发送的间隔和消息次数,方便后续开发Flink。一般来说,每秒发送一条,一直发送即可。

至此,Kafka的主题消费测试完成,接下来需要用Flink将主题落盘到HDFS。

如果运行代码时报和clock相关的错误,是因为本地时间和FushionInsight集群时间不一致所致,请将本地时间和服务器时间差控制在5分钟内。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_130012.html

2 Flink

1.1 Kerberos安全模式的认证与环境准备

用户在提交Flink应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交Flink的应用程序中需要设置安全认证,确保Flink程序能够正常运行。

在这里插入图片描述


图为Flink在华为MRS安全模式的认证体系。

对于Kafka的权限在章节1.1已经获取,另外要保证有yarn资源的使用权限,还需要对HDFS的/flink/flink-checkpoint目录获取权限,保证执行。有了相关权限之后,再下载kerberos认证凭据文件,keytab和conf。准备运行环境同Kafka类似,需要对Flink客户端进行配置,注意config文件应该在权限修改之后获取。

Flink整个系统存在三种认证方式,使用kerberos认证、使用security cookie进行认证、使用YARN内部的认证机制。在进行安全认证时,可以用flink自带的wordcount样例程序进行提交测试,根据提交结果反馈再进行适配,直到提交成功。如果报auth相关的错误,可能还是权限问题,可以尝试先将租户权限给到最大,谨慎操作,先保证代码能通。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_050010.html

1.2 Flink任务的开发

最终在yarn队列运行的flink程序是从本地idea打包,通过flink run提交的。前面安全模式已经打通,在开发时仍然是使用华为官方的flink样例代码进行修改调试。

在具体的flink程序开发中,由于是开启了kerberos认证的安全模式,需要加入判断安全模式登录的代码段在main方法,以下代码来自华为官方样例:

 if (LoginUtil.isSecurityModel()) {
            try {
                LOG.info("Securitymode start.");
                //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
                LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
            } catch (IOException e) {
                LOG.error("Security prepare failure.");
                LOG.error("The IOException occured : {}.", e);
                return;
            }
            LOG.info("Security prepare success.");
        }

对于具体需求的开发参照开源Flink的apache官方文档即可,只需要保证依赖是华为官方镜像站的。

在该需求中,是将消费的数据落盘到HDFS中。开发中要用到FlinkKafkaConsumer方法创建kafka消费者,拿到流数据。该方法在Flink1.17版本被弃用,但是Flink1.15仍然可以用,具体开发方法可参考Flink1.13的官方文档Apache Kafka 连接器。

FlinkKafkaConsumer方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

接收的Kafka数据,我们不需要处理,测试时直接测试主题的数据写入HDFS即可,需要用StreamingFileSink方法。该方法可以设置按照日期分桶,我们设置.withBucketAssigner为每天一个桶,保证每天消费的数据在一个文件中,同时用该方法传入日期格式参数yyyy-MM-dd,这样便于使用shell调度每日增量数据时日期变量的传递。

FileSink方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/file_sink/

另外,关于Sink到HDFS的数据文件(part file) 生命周期有几种状态,其中当文件名为in-progress表示当前文件正在写入中,此时的文件是不能被Hive读到的,我们需要将该文件的状态通过checkpoint机制转变为Finished。需要配置env.enableCheckpointing(60000)开启checkpoint,该参数是60秒开启一次。

完成代码开发后无法在本地测试,只能通过maven打包到华为服务器,通过flink run提交到yarn,此时可以指定并行度及其他配置。

通过以上方法即可实现将我们测试主题中的数据存储在按照每天一个yyyy-mm-dd命名的文件夹中。

3 HDFS与Hive

HDFS与Hive的交互也可以使用FlinkSQL,但是考虑到未来对数据的加工过滤,在此需求中选择将数据落盘HDFS再通过Shell命令调度至Hive。

3.1 Shell脚本的编写思路

  1. source华为的环境,认证状态成功;

  2. 创建日期变量:c_date=$(date '+%Y-%m-%d')

  3. 在beeline -u中执行HiveSQL代码:

    • 使用beeline的变量函数--hivevar将在外部注册的c_date变量注册为hive beeline的变量;

    • 创建临时外部表,映射字段一行数据,建表语句中指定位置为Flink写入的当日日期变量的HDFS数据文件夹;

    • 将临时表中的数据解析,一般是json数据,可通过get_json_object函数解析为字段,insert into table存入贴源层正式表;

    • 删除临时表;

  4. 有需要的话,也可以添加日志路径,将执行结果追加至日志。

3.2 脚本测试方法

该脚本的执行原理是首先在刷新华为租户环境,然后创建时间变量,并且是yyyy-mm-dd格式,与flink写入在HDFS中的每日增量文件夹名相同;

然后在beeline客户端中注册beeline的变量,将linux的时间变量传入beeline;

解下来是建临时表,将HDFS中的增量数据先写入,再解析字段到下一层标准表,同时删除临时表,通过此方法即完成每天新增数据的导入。

需要注意的是,hive -e命令似乎由于认证安全设置,无法在华为集群节点机使用。

4 DolphinScheduler

通过将脚本文件挂在DS调度中,每天在Flink完成消费落盘后,即可执行该shell。DS的部署不在华为MRS集群,在客户端节点中,使用开源版本即可,DS更方便查看每天的调度执行日志。

需要注意的是,目前我的需求中每天的新增数据大约2000-10000条,可以在短时间内完成调度执行。

原文地址:https://blog.csdn.net/qq_31412425/article/details/135657442

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署