微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink基本API概念

参考链接:https://flink.sojb.cn/dev/api_concepts.html#anatomy-of-a-flink-program

1. DataSet and DataStream

表示Flink app中的分布式数据集
包含重复的、不可变数据集
DataSet有界、DataStream可以是无界
可以从数据源、也可以通过各种转换操作创建

2、Flink编程套路

获得一个execution environment,
加载/创建初始数据,
指定此数据的转换,
指定放置计算结果的位置,
触发程序执行

3、惰性计算

Flink APP都是延迟执行的
只有当execute()被显示调用时才会真正执行
本地执行还是在集群上执行取决于执行环境的类型
好处:用户可以根据业务构建复杂的应用,Flink可以整体进优化并生成执行计划

4、指定Keys
哪些操作需要指定key
join
coGroup
keyBy
groupBy
Reduce
GroupReduce
Aggregate
Windows

Flink的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。
键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。,可以在具体算子通过参数指定:

DataSet被分组为

DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);
虽然可以使用DataStream指定Keys
DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);
  
5、为Tuple定义键
按照指定属性分组
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:此时表示使用Tuple3三元组的第一个成员作为keyBy
按照组合键进行分组
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
注意:此时表示使用Tuple3三元组的前两个元素一起作为keyBy
特殊情况:嵌套Tuple
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:这里使用KeyBy(0)指定键,系统将会使用整个Tuple2作为键(整型和浮点型的)。

6、使用字段表达式定义键

基于字符串的字段表达式可以用来引用嵌套字段(例如Tuple,POJO)
public class WC {
public String word;
public User user;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
备注:通过word字段进行分组

语法
直接使用字段名选择POJO字段
例如 user 表示 一个POJO的user字段
Tuple通过offset来选择
"_1"和"5"分别代表第一和第六个Scala Tuple字段
“f0‖ and ―5‖分别代表第一和第六个Java Tuple字段
选择POJO和Tuples的嵌套属性
user.zip表示使用user对象的zip字段作为key

字段表达示例:
public static class WC {
  public ComplexnestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexnestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}
这些是上面示例代码的有效字段表达式:

"count":类中的count字段WC。

"complex":递归选择POJO类型的字段复合体的所有字段ComplexnestedClass。

"complex.word.f2":选择嵌套的最后一个字段Tuple3。

"complex.hadoopCitizen":选择Hadoop IntWritable类型。

7、指定转换函数

方式一:实现接口

最基本的方法是实现一个提供的接口:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());

方式二:匿名类

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});

方式三:Java 8 Lambdas

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

方式四:Rich Functions

需要用户定义函数的所有转换都可以将富函数作为参数。例如,而不是

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
你可以写

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
并像往常一样将函数传递给map转换:

data.map(new MyMapFunction());
丰富的函数也可以定义为匿名类:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

非常有用的四个方法:open,close,getRuntimeContext和setRuntimecontext 这些功能在参数化函数、创建和确定本地状态、获取广播变量、获取运行时信息(例如 累加器和计数器)和迭代信息时非常有帮助

8、Flink支持的数据类型

Flink对DataSet和DataStream中可使用的元素类型添加了一些约束。原因是系统可以通过分析这些类型来确定有效的执行策略和选择不同的序列化方式。 常用的几种数据类型:
Java Tuple 和 Scala Case类;
Java POJO;
基本类型;
通用类;
值;
Hadoop Writables;

Tuple是包含固定数量各种类型字段的复合类
Flink Java API提供了Tuple1-Tuple25
Tuple的字段可以是Flink的任意类型,甚至嵌套Tuple
访问Tuple属性的方式
属性名(f0,f1…fn)
getField(int pos)

Java Tuple:

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});

wordCounts.keyBy(0); // also valid .keyBy("f0")

Scala案例类(和Scala元组是案例类的特例)是包含固定数量的具有各种类型的字段的复合类型。元组字段通过其1偏移名称来寻址,例如_1第一个字段。案例类字段按名称访问。

case class WordCount(word: String, count: Int)
val input = env.fromElements(
    WordCount("hello", 1),
    WordCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set

input2.keyBy(0, 1) // key by field positions 0 and 1

POJOs:
Java和Scala的类在满足下列条件时将会被Flink视作特殊的POJO数据类型专门进行处理:
是公共类;
无参构造是公共的;
所有的属性都是可获得的(声明为公共的,或提供get,set方法);
字段的类型必须是Flink支持的。Flink会用Avro来序列化任意的对象
Flink会分析POJO类型的结构获知POJO的字段。POJO类型要比一般类型好用。此外,Flink访问POJO要比一般类型更高效。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) { this.word = word; this.count = count; }
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word");

基本类型和一般通用类:
基本类型
Flink支持Java和Scala所有的基本数据类型,比如 Integer,String,和Double
一般通用类
Flink支持大多数的Java,Scala类(API和自定义)。包含不能序列化字段的类在增加一些限制后也可支持。遵循Java Bean规范的类一般都可以使用。
所有不能视为POJO的类Flink都会当做一般类处理。这些数据类型被视作黑箱,其内容是不可见的。通用类使用Kryo进行序列/反序列化。

值类型Values和Hadoop的Writable类:
值类型Values
通过实现org.apache.flinktypes.Value接口的read和write方法提供自定义代码来进行序列化/反序列化,而不是使用通用的序列化框架
Flink预定义的值类型与原生数据类型是一一对应的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。这些值类型作为原生数据类型的可变变体,他们的值是可以改变的,允许程序重用对象从而缓解GC的压力。
Hadoop的Writable类
实现org.apache.hadoop.Writable接口的类型,该类型的序列化逻辑在write()和readFields()方法中实现

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐