技本功丨用短平快的方式告诉你:Flink-SQL的扩展实现

2019年1月28日,阿里云宣布开源“计算王牌”实时计算平台Blink回馈给ApacheFlink社区。官方称,计算延迟已经降到毫秒级,也就是你在浏览网页的时候,眨了一下眼睛,淘宝、天猫处理的信息已经刷新了17亿次。

1.jpg

作为一家对技术有追求、有渴望的公司,怎么少得了为Flink社区做些贡献呢?

 

2.jpg

夫子说

首先,本文所述均基于flink 1.5.4。

我们为什么扩展Flink-SQL?

由于Flink 本身SQL语法并不提供在对接输入源和输出目的的SQL语法。数据开发在使用的过程中需要根据其提供的Api接口编写Source和 Sink, 异常繁琐,不仅需要了解FLink 各类Operator的API,还需要对各个组件的相关调用方式有了解(比如kafka,redis,mongo,hbase等),并且在需要关联到外部数据源的时候没有提供SQL相关的实现方式,因此数据开发直接使用Flink编写SQL作为实时的数据分析时需要较大的额外工作量。

我们的目的是在使用Flink-SQL的时候只需要关心做什么,而不需要关心怎么做。不需要过多的关心程序的实现,专注于业务逻辑。

接下来,我们一起来看下Flink-SQL的扩展实现吧!

 

01扩展了哪些flink相关sql

(1)创建源表语句

3.jpg

(2)创建输出表语句

4.jpg

(3)创建自定义函数

5.jpg

(4)维表关联

6.jpg

02各个模块是如何翻译到flink的实现

( 1 ) 如何将创建源表的sql语句转换为flink的operator;

Flink中表的都会映射到Table这个类。然后调用注册方法将Table注册到environment。

StreamTableEnvironment.registerTable(tableName, table);

当前我们只支持kafka数据源。Flink本身有读取kafka 的实现类, FlinkKafkaConsumer09,所以只需要根据指定参数实例化出该对象。并调用注册方法注册即可。

另外需要注意在flink sql经常会需要用到rowtime, proctime, 所以我们在注册表结构的时候额外添加rowtime,proctime。

当需要用到rowtime的使用需要额外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定义watermark主要做两个事情:1:如何从Row中获取时间字段。 2:设定最大延迟时间。

( 2 ) 如何将创建的输出表sql语句转换为flink的operator;

Flink输出Operator的基类是OutputFormat, 我们这里继承的是RichOutputFormat, 该抽象类继承OutputFormat,额外实现了获取运行环境的方法getRuntimeContext(), 方便于我们之后自定义metric等操作。

我们以输出到mysql插件mysql-sink为例,分两部分:

  • 将create table 解析出表名称,字段信息,mysql连接信息。

该部分使用正则表达式的方式将create table 语句转换为内部的一个实现类。该类存储了表名称,字段信息,插件类型,插件连接信息。

  • 继承RichOutputFormat将数据写到对应的外部数据源。

主要是实现writeRecord方法,在mysql插件中其实就是调用jdbc 实现插入或者更新方法。

( 3) 如何将自定义函数语句转换为flink的operator;

Flink对udf提供两种类型的实现方式:

(1)继承ScalarFunction

(2)继承TableFunction

需要做的将用户提供的jar添加到URLClassLoader, 并加载指定的class (实现上述接口的类路径),然后调用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的注册。之后即可使用改定义的udf;

( 4 ) 维表功能是如何实现的?

流计算中一个常见的需求就是为数据流补齐字段。因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度信息补全,但是当前flink并未提供join外部数据源的SQL功能。

实现该功能需要注意的几个问题:

(1)维表的数据是不断变化的

在实现的时候需要支持定时更新内存中的缓存的外部数据源,比如使用LRU等策略。

(2)IO吞吐问题

如果每接收到一条数据就串行到外部数据源去获取对应的关联记录的话,网络延迟将会是系统最大的瓶颈。这里我们选择阿里贡献给flink社区的算子RichAsyncFunction。该算子使用异步的方式从外部数据源获取数据,大大减少了花费在网络请求上的时间。

(3)如何将sql 中包含的维表解析到flink operator   

为了从sql中解析出指定的维表和过滤条件, 使用正则明显不是一个合适的办法。需要匹配各种可能性。将是一个无穷无尽的过程。查看flink本身对sql的解析。它使用了calcite做为sql解析的工作。将sql解析出一个语法树,通过迭代的方式,搜索到对应的维表;然后将维表和非维表结构分开。

7.jpg

通过上述步骤可以通过SQL完成常用的从kafka源表,join外部数据源,写入到指定的外部目的结构中。


原文地址:https://blog.51cto.com/13766600/2370350

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的处理数据是固定的离线计算是有延时的,T+1离线计算是数据处理完输出结果,只是输出最终结果离线计算相对可以处理复杂的计算1.2实时计算实时计算是实时的处理数据,数据从流入到计算出结果延迟低实时计算是输
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能:新增工作流新增OceanBaseSQL新增Flinkjar任务数据同步、实时采集支持脏数据管理HiveUDF控制台UI升级租户绑定简化新版本的使用文档已在社区中推送,大家可以随时下载查阅,欢迎大家体验新版本功能
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽。本文主要记录一下Java使用Flink的简单例子。首先,去官网下载Flink的zip包(链接就不提供了,你已经是个成熟的程序员了,该有一定的搜索能力了),解压后放到你想放的地方。进入主目录后,是这样子的 image.png你可以简
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasreproducedhereunderthe ApacheLicense,Version2.0.ThisblogpostprovidesanintroductiontoApacheFlink’sbuilt-inmonitoringandmetricssystem,thatallowsdeveloperstoeffectively
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下的flink-conf.yaml:经过初步的调整,大约有以下模块的参数(未优化)LicensedtotheApacheSoftwareFoundation(ASF)underoneormorecontributorlicenseagreements.SeetheNOTICEfiledistributedwiththis
1.mac平台安装flink(默认最新版)brewinstallapache-flink安装结果:Version1.7.1,commitID:89eafb42.jdk版本,我尝试使用了Java8和Java11,都能兼容3.在flink的安装目录下,启动flink目录一般默认在/usr/local/Cellar/apache-flink/1.7.1/(查找flink安装目录:find/-name
课程目标:学完该课程大家会对Flink有非常深入的了解,同时可以体会到Flink的强大之处,以及可以结合自己公司的业务进行使用,减少自己研究和学习Flink的时间。适合人群:适合有大数据开发基础和flink基础的同学。在开始学习前给大家说下什么是Flink? 1.Flink是一个针对流数据和批数据的
本文主要研究一下flink的NetworkEnvironmentConfigurationNetworkEnvironmentConfigurationflink-1.7.2/flink-runtime/src/main/java/org/apache/flinkuntimeaskmanager/NetworkEnvironmentConfiguration.javapublicclassNetworkEnvironmentCon
January22,2019 UseCases, ApacheFlinkLasseNedergaard   Recentlytherehasbeensignificantdiscussionaboutedgecomputingasamajortechnologytrendin2019.Edgecomputingbrings computingcapabilitiesawayfromthecloud,andrathercloset
1DataStreamAPI1.1DataStreamDataSources   source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。   flink提供了大量的已经实现好的source方法,可以自定义source   通过实现sourceFunction接口来
基于Flink流处理的动态实时亿级全端用户画像系统课程下载:https://pan.baidu.com/s/1YtMs-XG5-PsTFV9_7-AlfA提取码:639m项目中采用到的算法包含LogisticRegression、Kmeans、TF-IDF等,Flink暂时支持的算法比较少,对于以上算法,本课程将手把手带大家用Flink实现,并且结合真实场景,
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些
 flink集群安装部署 standalone集群模式 必须依赖必须的软件JAVA_HOME配置flink安装配置flink启动flink添加Jobmanageraskmanager实例到集群个人真实环境实践安装步骤 必须依赖必须的软件flink运行在所有类unix环境中,例如:linux、mac、或
1Flink的前世今生(生态很重要)很多人可能都是在2015年才听到Flink这个词,其实早在2008年,Flink的前身已经是柏林理工大学一个研究性项目,在2014被Apache孵化器所接受,然后迅速地成为了ASF(ApacheSoftwareFoundation)的顶级项目之一。   ApacheFlinkisanopensource
序本文主要研究一下flink的CsvTableSourceTableSourceflink-table_2.11-1.7.1-sources.jar!/org/apache/flinkable/sources/TableSource.scalatraitTableSource[T]{/**Returnsthe[[TypeInformation]]forthereturntypeoft
原文链接JobManager高可用性(HA)  作业管理器JobManager协调每个Flink部署组件,它负责调度以及资源管理。  默认情况下,每个Flink集群只有一个独立的JobManager实例,因此可能会产生单点故障(SPOF)。  使用JobManagerHighAvailability,可以从JobManager的故障中恢复,从而消除SPOF。
一、背景在flink本地环境安装完成之后,就想着怎么能调试和运行一个flink示例程序,本文记录下过程。二、获取flink源码通过如下命令,获取flink源码,在源码中有flink-examples模块,该模块中包含简单的SocketWindowWordCount.java示例程序。gitclonehttps://github.com/apache/
作为一家创新驱动的科技公司,袋鼠云每年研发投入达数千万,公司80%员工都是技术人员,袋鼠云产品家族包括企业级一站式数据中台PaaS数栈、交互式数据可视化大屏开发平台Easy[V]等产品也在迅速迭代。在进行产品研发的过程中,技术小哥哥们能文能武,不断提升产品性能和体验的同时,也把这些提
在阅读本文之前,请先阅读Flink原理与实现:Window机制,这篇文章从用户的角度,对Window做了比较详细的分析,而本文主要是从Flink框架的实现层面,对Window做另一个角度的分析。首先看一个比较简单的情况,假设我们在一个KeyedStream上做了一个10秒钟的tumblingprocessingtimewindow