聊聊flink的CsvTableSource

  序
  
  本文主要研究一下flink的CsvTableSource
  
  TableSource
  
  flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scala
  
  trait TableSource[T] {
  
  /** Returns the [[TypeInformation]] for the return type of the [[TableSource]].
  
  * The fields of the return type are mapped to the table schema based on their name.
  
  *
  
  * @return The type of the returned [[DataSet]] or [[DataStream]].
  
  */
  
  def getReturnType: TypeInformation[T]
  
  /**
  
  * Returns the schema of the produced table.
  
  *
  
  * @return The [[TableSchema]] of the produced table.
  
  */
  
  def getTableSchema: TableSchema
  
  /**
  
  * Describes the table source.
  
  *
  
  * @return A String explaining the [[TableSource]].
  
  */
  
  def explainSource(): String =
  
  TableConnectorUtil.generateRuntimeName(getClass, getTableSchema.getFieldNames)
  
  }
  
  TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource
  
  BatchTableSource
  
  flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/BatchTableSource.scala
  
  trait BatchTableSource[T] extends TableSource[T] {
  
  /**
  
  * Returns the data of the table as a [[DataSet]].
  
  *
  
  * NOTE: This method is for internal use only for defining a [[TableSource]].
  
  * Do not use it in Table API programs.
  
  */
  
  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
  
  }
  
  BatchTableSource继承了TableSource,它定义了getDataSet方法
  
  StreamTableSource
  
  flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/StreamTableSource.scala
  
  trait StreamTableSource[T] extends TableSource[T] {
  
  /**
  
  * Returns the data of the table as a [[DataStream]].
  
  *
  
  * NOTE: This method is for internal use only for defining a [[TableSource]].
  
  * Do not use it in Table API programs.
  
  */
  
  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
  
  }
  
  StreamTableSource继承了TableSource,它定义了getDataStream方法
  
  CsvTableSource
  
  flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/CsvTableSource.scala
  
  class CsvTableSource private (
  
  private val path: String,
  
  private val fieldNames: Array[String],
  
  private val fieldTypes: Array[TypeInformation[_]],
  
  private val selectedFields: Array[Int],
  
  private val fieldDelim: String,
  
  private val rowDelim: String,
  
  private val quoteCharacter: Character,
  
  private val ignoreFirstLine: Boolean,
  
  private val ignoreComments: String,
  
  private val lenient: Boolean)
  
  extends BatchTableSource[Row]
  
  with StreamTableSource[Row]
  
  with ProjectableTableSource[Row] {
  
  def this(
  
  path: String,
  
  fieldNames: Array[String],
  
  fieldTypes: Array[TypeInformation[_]],
  
  fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
  
  rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
  
  quoteCharacter: Character = null,
  
  ignoreFirstLine: Boolean = false,
  
  ignoreComments: String = null,
  
  lenient: Boolean = false)www.michenggw.com = {
  
  this(
  
  path,
  
  fieldNames,
  
  fieldTypes,
  
  fieldTypes.indices.toArray, // initially, all fields are returned
  
  fieldDelim,
  
  rowDelim,
  
  quoteCharacter,
  
  ignoreFirstLine,
  
  ignoreComments,
  
  lenient)
  
  }
  
  def this(path: String, fieldNames: Array[String]www.fengshen157.com/, fieldTypes: Array[TypeInformation[_]]) = {
  
  this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
  
  CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
  
  }
  
  if (fieldNames.length != fieldTypes.length) {
  
  throw new TableException("Number of field names and field types must be equal.")
  
  }
  
  private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
  
  private val selectedFieldNames = selectedFields.map(fieldNames(_))
  
  private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)
  
  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
  
  execEnv.createInput(createCsvInput(), returnType).name(explainSource())
  
  }
  
  /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
  
  override def getReturnType: www.leyouzaixian2.com RowTypeInfo = returnType
  
  override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
  
  streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource())
  
  }
  
  /** Returns the schema of the produced table. */
  
  override def getTableSchema = new TableSchema(fieldNames, fieldTypes)
  
  /** Returns a copy of [[TableSource]] with ability to project fields */
  
  override def projectFields(fields: Array[Int]): CsvTableSource = {
  
  val selectedFields = if (fields.isEmpty) Array(0) else fields
  
  new CsvTableSource(
  
  path,
  
  fieldNames,
  
  fieldTypes,
  
  selectedFields,
  
  fieldDelim,
  
  rowDelim,
  
  quoteCharacter,
  
  ignoreFirstLine,
  
  ignoreComments,
  
  lenient)
  
  }
  
  private def createCsvInput(): RowCsvInputFormat = {
  
  val inputFormat = new RowCsvInputFormat(
  
  new Path(path),
  
  selectedFieldTypes,
  
  rowDelim,
  
  fieldDelim,
  
  selectedFields)
  
  inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
  
  inputFormat.setLenient(www.dasheng178.com lenient)
  
  if (quoteCharacter != null) {
  
  inputFormat.enableQuotedStringParsing(quoteCharacter)
  
  }
  
  if (ignoreComments != null) {
  
  inputFormat.setCommentPrefix(ignoreComments)
  
  }
  
  inputFormat
  
  }
  
  override def equals(other: Any): Boolean = other match {
  
  case that: CsvTableSource => returnType == that.returnType &&
  
  path == that.path &&
  
  fieldDelim == that.fieldDelim &&
  
  rowDelim == that.rowDelim &&
  
  quoteCharacter == that.quoteCharacter &&
  
  ignoreFirstLine == that.ignoreFirstLine &&
  
  ignoreComments == that.ignoreComments &&
  
  lenient == that.lenient
  
  case _ => false
  
  }
  
  override def hashCode(www.hengda157.com): Int = {
  
  returnType.hashCode()
  
  }
  
  override def explainSource(): String = {
  
  s"CsvTableSource(" +
  
  s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
  
  }
  
  }
  
  CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
  
  ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来
  
  getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串
  
  小结
  
  TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource;BatchTableSource继承了TableSource,它定义了getDataSet方法;StreamTableSource继承了TableSource,它定义了getDataStream方法
  
  CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
  
  ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来;getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串

原文地址:https://www.cnblogs.com/qwangxiao/p/10353179.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的处理数据是固定的离线计算是有延时的,T+1离线计算是数据处理完输出结果,只是输出最终结果离线计算相对可以处理复杂的计算1.2实时计算实时计算是实时的处理数据,数据从流入到计算出结果延迟低实时计算是输
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能:新增工作流新增OceanBaseSQL新增Flinkjar任务数据同步、实时采集支持脏数据管理HiveUDF控制台UI升级租户绑定简化新版本的使用文档已在社区中推送,大家可以随时下载查阅,欢迎大家体验新版本功能
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽。本文主要记录一下Java使用Flink的简单例子。首先,去官网下载Flink的zip包(链接就不提供了,你已经是个成熟的程序员了,该有一定的搜索能力了),解压后放到你想放的地方。进入主目录后,是这样子的 image.png你可以简
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasreproducedhereunderthe ApacheLicense,Version2.0.ThisblogpostprovidesanintroductiontoApacheFlink’sbuilt-inmonitoringandmetricssystem,thatallowsdeveloperstoeffectively
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下的flink-conf.yaml:经过初步的调整,大约有以下模块的参数(未优化)LicensedtotheApacheSoftwareFoundation(ASF)underoneormorecontributorlicenseagreements.SeetheNOTICEfiledistributedwiththis
1.mac平台安装flink(默认最新版)brewinstallapache-flink安装结果:Version1.7.1,commitID:89eafb42.jdk版本,我尝试使用了Java8和Java11,都能兼容3.在flink的安装目录下,启动flink目录一般默认在/usr/local/Cellar/apache-flink/1.7.1/(查找flink安装目录:find/-name
课程目标:学完该课程大家会对Flink有非常深入的了解,同时可以体会到Flink的强大之处,以及可以结合自己公司的业务进行使用,减少自己研究和学习Flink的时间。适合人群:适合有大数据开发基础和flink基础的同学。在开始学习前给大家说下什么是Flink? 1.Flink是一个针对流数据和批数据的
本文主要研究一下flink的NetworkEnvironmentConfigurationNetworkEnvironmentConfigurationflink-1.7.2/flink-runtime/src/main/java/org/apache/flinkuntimeaskmanager/NetworkEnvironmentConfiguration.javapublicclassNetworkEnvironmentCon
January22,2019 UseCases, ApacheFlinkLasseNedergaard   Recentlytherehasbeensignificantdiscussionaboutedgecomputingasamajortechnologytrendin2019.Edgecomputingbrings computingcapabilitiesawayfromthecloud,andrathercloset
1DataStreamAPI1.1DataStreamDataSources   source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。   flink提供了大量的已经实现好的source方法,可以自定义source   通过实现sourceFunction接口来
基于Flink流处理的动态实时亿级全端用户画像系统课程下载:https://pan.baidu.com/s/1YtMs-XG5-PsTFV9_7-AlfA提取码:639m项目中采用到的算法包含LogisticRegression、Kmeans、TF-IDF等,Flink暂时支持的算法比较少,对于以上算法,本课程将手把手带大家用Flink实现,并且结合真实场景,
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些
 flink集群安装部署 standalone集群模式 必须依赖必须的软件JAVA_HOME配置flink安装配置flink启动flink添加Jobmanageraskmanager实例到集群个人真实环境实践安装步骤 必须依赖必须的软件flink运行在所有类unix环境中,例如:linux、mac、或
1Flink的前世今生(生态很重要)很多人可能都是在2015年才听到Flink这个词,其实早在2008年,Flink的前身已经是柏林理工大学一个研究性项目,在2014被Apache孵化器所接受,然后迅速地成为了ASF(ApacheSoftwareFoundation)的顶级项目之一。   ApacheFlinkisanopensource
序本文主要研究一下flink的CsvTableSourceTableSourceflink-table_2.11-1.7.1-sources.jar!/org/apache/flinkable/sources/TableSource.scalatraitTableSource[T]{/**Returnsthe[[TypeInformation]]forthereturntypeoft
原文链接JobManager高可用性(HA)  作业管理器JobManager协调每个Flink部署组件,它负责调度以及资源管理。  默认情况下,每个Flink集群只有一个独立的JobManager实例,因此可能会产生单点故障(SPOF)。  使用JobManagerHighAvailability,可以从JobManager的故障中恢复,从而消除SPOF。
一、背景在flink本地环境安装完成之后,就想着怎么能调试和运行一个flink示例程序,本文记录下过程。二、获取flink源码通过如下命令,获取flink源码,在源码中有flink-examples模块,该模块中包含简单的SocketWindowWordCount.java示例程序。gitclonehttps://github.com/apache/
作为一家创新驱动的科技公司,袋鼠云每年研发投入达数千万,公司80%员工都是技术人员,袋鼠云产品家族包括企业级一站式数据中台PaaS数栈、交互式数据可视化大屏开发平台Easy[V]等产品也在迅速迭代。在进行产品研发的过程中,技术小哥哥们能文能武,不断提升产品性能和体验的同时,也把这些提
在阅读本文之前,请先阅读Flink原理与实现:Window机制,这篇文章从用户的角度,对Window做了比较详细的分析,而本文主要是从Flink框架的实现层面,对Window做另一个角度的分析。首先看一个比较简单的情况,假设我们在一个KeyedStream上做了一个10秒钟的tumblingprocessingtimewindow