[源码解析] Flink的groupBy和reduce究竟做了什么

[源码解析] Flink的groupBy和reduce究竟做了什么

0x00 摘要

Groupby和reduce是大数据领域常见的算子,但是很多同学应该对其背后机制不甚了解。本文将从源码入手,为大家解析Flink中Groupby和reduce的原理,看看他们在背后做了什么。

0x01 问题和概括

1.1 问题

探究的原因是想到了几个问题 :

  • groupby的算子会对数据进行排序嘛。
  • groupby和reduce过程中究竟有几次排序。
  • 如果有多个groupby task,什么机制保证所有这些grouby task的输出中,同样的key都分配给同一个reducer。
  • groupby和reduce时候,有没有Rebalance 重新分配。
  • reduce算子会不会重新划分task。
  • reduce算子有没有可能和前后的其他算子组成Operator Chain。

1.2 概括

为了便于大家理解,我们先总结下,对于一个Groupby + Reduce的操作,Flink做了如下处理:

  • Group其实没有真实对应的算子,它只是在在reduce过程之前的一个中间步骤或者辅助步骤。
  • 在Flink生成批处理执行计划后,有意义的结果是Reduce算子。
  • 为了更好的reduce,Flink在reduce之前大量使用了Combine操作。Combine可以理解为是在map端的reduce的操作,对单个map任务的输出结果数据进行合并的操作。
  • 在Flink生成批处理优化计划(Optimized Plan)之后,会把reduce分割成两段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE。
  • SORTED_PARTIAL_REDUCE就是Combine。
  • Flink生成JobGraph之后,Flink形成了一个Operator Chain:Reduce(SORTED_PARTIAL_REDUCE)和其上游合并在一起。
  • Flink用Partitioner来保证多个 grouby task 的输出中同样的key都分配给同一个reducer。
  • groupby和reduce过程中至少有三次排序:
    • combine
    • sort + merge
    • reduce

这样之前的疑问就基本得到了解释。

0x02 背景概念

2.1 MapReduce细分

MapReduce是一种编程模型,用于大规模数据集的并行运算。概念 "Map(映射)"和"Reduce(归约)" 是它们的主要思想,其是从函数式编程语言,矢量编程语言里借来的特性。

我们目前使用的Flink,Spark都出自于MapReduce,所以我们有必有追根溯源,看看MapReduce是如何区分各个阶段的。

2.2 MapReduce细分

如果把MapReduce细分,可以分为一下几大过程:

  • Input-Split(输入分片):此过程是将从HDFS上读取的文件分片,然后送给Map端。有多少分片就有多少Mapper,一般分片的大小和HDFS中的块大小一致。
  • Shuffle-Spill(溢写):每个Map任务都有一个环形缓冲区。一旦缓冲区达到阈值80%,一个后台线程便开始把内容“溢写”-“spill”到磁盘。在溢写过程中,map将继续输出到剩余的20%空间中,互不影响,如果缓冲区被填满map会被堵塞直到写磁盘完成。
  • Shuffle-Partition(分区):由于每个Map可能处理的数据量不同,所以到达reduce有可能会导致数据倾斜。分区可以帮助我们解决这一问题,在shuffle过程中会按照默认key的哈希码对分区数量取余,reduce便根据分区号来拉取对应的数据,达到数据均衡。分区数量对应Reduce个数。
  • Shuffle-Sort(排序):在分区后,会对此分区的数据进行内排序,排序过程会穿插在整个MapReduce中,在很多地方都存在。
  • Shuffle-Group(分组):分组过程会把key相同的value分配到一个组中,wordcount程序就利用了分组这一过程。
  • Shuffle-Combiner(组合):这一过程我们可以理解为一个小的Reduce阶段,当数据量大的时候可以在map过程中执行一次combine,这样就相当于在map阶段执行了一次reduce。由于reduce和map在不同的节点上运行,所以reduce需要远程拉取数据,combine就可以有效降低reduce拉取数据的量,减少网络负荷(这一过程默认是不开启的,在如求平均值的mapreduce程序中不要使用combine,因为会影响结果)。
  • Compress(压缩):在缓冲区溢写磁盘的时候,可以对数据进行压缩,节约磁盘空间,同样减少给reducer传递的数据量。
  • Reduce-Merge(合并):reduce端会拉取各个map输出结果对应的分区文件,这样reduce端就会有很多文件,所以在此阶段,reduce再次将它们合并/排序再送入reduce执行。
  • Output(输出):在reduce阶段,对已排序输出中的每个键调用reduce函数。此阶段的输出直接写到输出文件系统,一般为HDFS。

2.3 Combine

Combine是我们需要特殊注意的。在mapreduce中,map多,reduce少。在reduce中由于数据量比较多,所以我们干脆在map阶段中先把自己map里面的数据归类,这样到了reduce的时候就减轻了压力。

Combine可以理解为是在map端的reduce的操作,对单个map任务的输出结果数据进行合并的操作。combine是对一个map的,而reduce合并的对象是对于多个map

map函数操作所产生的键值对会作为combine函数的输入,经combine函数处理后再送到reduce函数进行处理,减少了写入磁盘的数据量,同时也减少了网络中键值对的传输量。在Map端,用户自定义实现的Combine优化机制类Combiner在执行Map端任务的节点本身运行,相当于对map函数的输出做了一次reduce。

集群上的可用带宽往往是有限的,产生的中间临时数据量很大时就会出现性能瓶颈,因此应该尽量避免Map端任务和Reduce端任务之间大量的数据传输。使用Combine机制的意义就在于使Map端输出更紧凑,使得写到本地磁盘和传给Reduce端的数据更少。

2.4 Partition

Partition是分割map每个节点的结果,按照key分别映射给不同的reduce,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也可以自定义。

这里其实可以理解归类。我们对于错综复杂的数据归类。比如在动物园里有牛羊鸡鸭鹅,他们都是混在一起的,但是到了晚上他们就各自牛回牛棚,羊回羊圈,鸡回鸡窝。partition的作用就是把这些数据归类。只不过是在写程序的时候,

在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。假如我们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪个reduce task去做呢,是需要立刻决定的。

MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce task的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

在我们的例子中,假定 “aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。

2.5 Shuffle

shuffle就是map和reduce之间的过程,包含了两端的combine和partition。它比较难以理解,因为我们摸不着,看不到它。它属于mapreduce的框架,编程的时候,我们用不到它。

Shuffle的大致范围就是:怎样把map task的输出结果有效地传送到reduce端。也可以这样理解, Shuffle描述着数据从map task输出到reduce task输入的这段过程。

2.6 Reducer

简单地说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,最终形成一个文件作为reduce task的输入文件。

0x03 代码

我们以Flink的KMeans算法作为样例,具体摘要如下:

public class WordCountExampleReduce {

    DataStream ds;

    public static void main(String[] args) throws Exception {
        //构建环境
        final ExecutionEnvironment env =
                ExecutionEnvironment.getExecutionEnvironment();
        //通过字符串构建数据集
        DataSet<String> text = env.fromElements(
                "Who‘s there?","I think I hear them. Stand,ho! Who‘s there?");
        //分割字符串、按照key进行分组、统计相同的key个数
        DataSet<Tuple2<String,Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<String,Integer>>() {
                    @Override
                    public Tuple2<String,Integer> reduce(Tuple2<String,Integer> value1,Tuple2<String,Integer> value2) throws Exception {
                        return new Tuple2(value1.f0,value1.f1 + value2.f1);
                    }
                });
        //打印
        wordCounts.print();
    }
    //分割字符串的方法
    public static class LineSplitter implements FlatMapFunction<String,Integer>> {
        @Override
        public void flatMap(String line,Collector<Tuple2<String,Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String,Integer>(word,1));
            }
        }
    }
}

输出是:

(hear,1)
(ho!,1)
(them.,1)
(I,2)
(Stand,1)
(Who‘s,2)
(there?,2)
(think,1)

首先,我们从Flink基本JAVA API来入手开始挖掘。

4.1 GroupBy是个辅助概念

4.1.1 Grouping

我们需要留意的是:GroupBy并没有对应的Operator。GroupBy只是生成DataSet转换的一个中间步骤或者辅助步骤

GroupBy功能的基类是Grouping,其只是DataSet转换的一个中间步骤。其几个主要成员是:

  • 对应的输入数据DataSet
  • 分组所基于的keys
  • 用户自定义的Partitioner
// Grouping is an intermediate step for a transformation on a grouped DataSet.
public abstract class Grouping<T> {
   protected final DataSet<T> inputDataSet;
   protected final Keys<T> keys;
   protected Partitioner<?> customPartitioner;
}

Grouping并没有任何业务相关的API,具体API都是在其派生类中,比如UnsortedGrouping。

4.1.2 UnsortedGrouping

我们代码中对应的就是UnsortedGrouping类。我们看到它提供了很多业务API,比如:sum,max,min,reduce,aggregate,reduceGroup,combineGroup.....

回到我们的示例,groupBy做了如下操作

  • 首先,groupBy返回的就是一个UnsortedGrouping,这个UnsortedGrouping是用来转换DataSet。
  • 其次,.groupBy(0).reduce(new CentroidAccumulator()) 返回的是ReduceOperator。这就对应了前面我们提到的,groupBy只是中间步骤,reduce才能返回一个Operator
public class UnsortedGrouping<T> extends Grouping<T> {
  
    // groupBy返回一个UnsortedGrouping
    public UnsortedGrouping<T> groupBy(int... fields) {
       return new UnsortedGrouping<>(this,new Keys.ExpressionKeys<>(fields,getType()));
    }
  
    // reduce返回一个ReduceOperator
 		public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
      return new ReduceOperator<T>(this,inputDataSet.clean(reducer),Utils.getCallLocationName());
    } 
}

4.2 reduce才是算子

对于业务来说,reduce才是真正有意义的逻辑算子。

从前文的函数调用和ReduceOperator定义可以看出,.groupBy(0).reduce() 的调用结果是生成一个ReduceOperator,而 UnsortedGrouping 被设置为 ReduceOperator 的 grouper 成员变量,作为辅助操作

public class ReduceOperator<IN> extends SingleInputUdfOperator<IN,IN,ReduceOperator<IN>> {
  
	private final ReduceFunction<IN> function;
	private final Grouping<IN> grouper; // UnsortedGrouping被设置在这里,后续reduce操作中会用到。

	public ReduceOperator(Grouping<IN> input,ReduceFunction<IN> function,String defaultName) {
		this.function = function;
		this.grouper = input; // UnsortedGrouping被设置在这里,后续reduce操作中会用到。
    this.hint = CombineHint.OPTIMIZER_CHOOSES; // 优化时候会用到。
	}
}

让我们顺着Flink程序执行阶段继续看看系统都做了些什么。

0x05 批处理执行计划(Plan)

程序执行的第一步是:当程序运行时候,首先会根据java API的结果来生成执行plan。

public JobClient executeAsync(String jobName) throws Exception {
   final Plan plan = createProgramPlan(jobName);
} 

其中重要的函数是translateToDataFlow,因为在translateToDataFlow方法中,会从批处理Java API模块中operators包往核心模块中operators包的转换

对于我们的示例程序,在生成 Graph时,translateToDataFlow会生成一个 SingleInputOperator,为后续runtime使用。下面是代码缩减版。

protected org.apache.flink.api.common.operators.SingleInputOperator<?,?> translateToDataFlow(Operator<IN> input) {
    
    ......
      
    // UnsortedGrouping中的keys被取出,  
		else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {

			// reduce with field positions
			ReduceOperatorBase<IN,ReduceFunction<IN>> po =
					new ReduceOperatorBase<>(function,operatorInfo,logicalKeyPositions,name);

			po.setCustomPartitioner(grouper.getCustomPartitioner());
			po.setInput(input);
			po.setParallelism(getParallelism()); // 没有并行度的变化

			return po;//translateToDataFlow会生成一个 SingleInputOperator,为后续runtime使用
		}	    
  }  
}

我们代码最终生成的执行计划如下,我们可以看出来,执行计划基本符合我们的估计:简单的从输入到输出。中间有意义的算子其实只有Reduce

GenericDataSourceBase ——> FlatMapOperatorBase ——> ReduceOperatorBase ——> GenericDataSinkBase

具体在代码中体现如下是:

plan = {Plan@1296} 
 sinks = {ArrayList@1309}  size = 1
  0 = {GenericDataSinkBase@1313} "collect()"
   formatWrapper = {UserCodeObjectWrapper@1315} 
   input = {ReduceOperatorBase@1316} "ReduceOperatorBase - Reduce at main(WordCountExampleReduceCsv.java:25)"
    hint = {ReduceOperatorBase$CombineHint@1325} "OPTIMIZER_CHOOSES"
    customPartitioner = null
    input = {FlatMapOperatorBase@1326} "FlatMapOperatorBase - FlatMap at main(WordCountExampleReduceCsv.java:23)"
     input = {GenericDataSourceBase@1339} "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)"

0x06 批处理优化计划(Optimized Plan)

程序执行的第二步是:Flink对于Plan会继续优化,生成Optimized Plan。其核心代码位于PlanTranslator.compilePlan 函数,这里得到了Optimized Plan。

这个编译的过程不作任何决策与假设,也就是说作业最终如何被执行早已被优化器确定,而编译也是在此基础上做确定性的映射。所以我们将集中精力看如何优化plan。

private JobGraph compilePlan(Plan plan,Configuration optimizerConfiguration) {
   Optimizer optimizer = new Optimizer(new DataStatistics(),optimizerConfiguration);
   OptimizedPlan optimizedPlan = optimizer.compile(plan);

   JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
   return jobGraphGenerator.compileJobGraph(optimizedPlan,plan.getJobId());
}

在内部调用plan的accept方法遍历它。accept会挨个在每个sink上调用accept。对于每个sink会先preVisit,然后 postVisit。

这里优化时候有几个注意点:

  1. 在 GraphCreatingVisitor.preVisit 中,当发现Operator是 ReduceOperatorBase 类型的时候,会建立ReduceNode。

    else if (c instanceof ReduceOperatorBase) {
       n = new ReduceNode((ReduceOperatorBase<?,?>) c);
    }
    
  2. ReduceNode是Reducer Operator的Optimizer表示。

    public class ReduceNode extends SingleInputNode {
    	private final List<OperatorDescriptorSingle> possibleProperties;	
    	private ReduceNode preReduceUtilityNode;
    }
    
  3. 生成ReduceNode时候,会根据之前提到的 hint 来决定 combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;

    public ReduceNode(ReduceOperatorBase<?,?> operator) {
    			DriverStrategy combinerStrategy;
    			switch(operator.getCombineHint()) {
    				case OPTIMIZER_CHOOSES:
    					combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;
    					break;
          }  
    }
    

生成的优化执行计划如下,我们可以看到,这时候设置了并行度,也把reduce分割成两段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE

Data Source  ——> FlatMap ——> Reduce(SORTED_PARTIAL_REDUCE)   ——> Reduce(SORTED_REDUCE)  ——> Data Sink

具体在代码中体现如下是:

optimizedPlan = {OptimizedPlan@1506} 
 
 allNodes = {HashSet@1510}  size = 5
   
  0 = {SourcePlanNode@1512} "Data Source "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,grouped=null,unique=null] ]]"
   parallelism = 4

  1 = {SingleInputPlanNode@1513} "FlatMap "FlatMap at main(WordCountExampleReduceCsv.java:23)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,unique=null] ]]"
   parallelism = 4

  2 = {SingleInputPlanNode@1514} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,unique=null] ]]"
   parallelism = 4

  3 = {SinkPlanNode@1515} "Data Sink "collect()" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,unique=null] ]]"
   parallelism = 4

  4 = {SingleInputPlanNode@1516} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_PARTIAL_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,unique=null] ]]"
   parallelism = 4

0x07 JobGraph

程序执行的第三步是:建立JobGraph。LocalExecutor.execute中会生成JobGraph。Optimized Plan 经过优化后生成了 JobGraph, JobGraph是提交给 JobManager 的数据结构。

主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

JobGraph是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一

public CompletableFuture<JobClient> execute(Pipeline pipeline,Configuration configuration) throws Exception {
   final JobGraph jobGraph = getJobGraph(pipeline,configuration);
}

我们可以看出来,这一步形成了一个Operator Chain:

CHAIN DataSource -> FlatMap -> Combine (Reduce) 

于是我们看到,Reduce(SORTED_PARTIAL_REDUCE)和其上游合并在一起

具体在程序中打印出来:

jobGraph = {JobGraph@1739} "JobGraph(jobId: 30421d78d7eedee6be2c5de39d416eb7)"
 taskVertices = {LinkedHashMap@1742}  size = 3
  
  {JobVertexID@1762} "e2c43ec0df647ea6735b2421fb7330fb" -> {InputOutputFormatVertex@1763} "CHAIN DataSource (at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCountExampleReduceCsv.java:23)) -> Combine (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.DataSourceTask)"
  
  {JobVertexID@1764} "2de11f497e827e48dda1d63b458dead7" -> {JobVertex@1765} "Reduce (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.BatchTask)"
  
  {JobVertexID@1766} "2bee17f2c86aa1e9439e3dedea58007b" -> {InputOutputFormatVertex@1767} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"

0x08 Runtime

Job提交之后,就是程序正式运行了。这里实际上涉及到了三次排序,

  • 一次是在FlatMap发送时候调用到了ChainedReduceCombineDriver.sortAndCombine。这部分对应了我们之前提到的MapReduce中的Combine和Partition。
  • 一次是在 ReduceDriver 所在的 BatchTask中,由UnilateralSortMerger完成了sort & merge操作。
  • 一次是在ReduceDriver,这里做了最后的reducer排序。

8.1 FlatMap

这里是第一次排序

当一批数据处理完成之后,在ChainedFlatMapDriver中调用到close函数进行发送数据给下游。

public void close() {
   this.outputCollector.close();
}

Operator Chain会调用到ChainedReduceCombineDriver.close

public void close() {
   // send the final batch
   try {
      switch (strategy) {
         case SORTED_PARTIAL_REDUCE:
            sortAndCombine(); // 我们是在这里
            break;
         case HASHED_PARTIAL_REDUCE:
            reduceFacade.emit();
            break;
      }
   } catch (Exception ex2) {
      throw new ExceptionInChainedStubException(taskName,ex2);
   }

   outputCollector.close();
   dispose(false);
}

8.1.1 Combine

sortAndCombine中先排序,然后做combine,最后会不断发送数据

private void sortAndCombine() throws Exception {
   final InMemorySorter<T> sorter = this.sorter;

   if (!sorter.isEmpty()) {
      sortAlgo.sort(sorter); // 这里会先排序

      final TypeSerializer<T> serializer = this.serializer;
      final TypeComparator<T> comparator = this.comparator;
      final ReduceFunction<T> function = this.reducer;
      final Collector<T> output = this.outputCollector;
      final MutableObjectIterator<T> input = sorter.getIterator();

      if (objectReuseEnabled) {
        ......
      } else {
         T value = input.next();

         // 这里就是combine
         // iterate over key groups
         while (running && value != null) {
            comparator.setReference(value);
            T res = value;

            // iterate within a key group
            while ((value = input.next()) != null) {
               if (comparator.equalToReference(value)) {
                  // same group,reduce
                  res = function.reduce(res,value);
               } else {
                  // new key group
                  break;
               }
            }

            output.collect(res); //发送数据
         }
      }
   }
}

8.1.2 Partition

最后发送给哪个下游,是由OutputEmitter.selectChannel决定的。有如下几种决定方式:

hash-partitioning,broadcasting,round-robin,custom partition functions。这里采用的是PARTITION_HASH。

每个task都会把同样字符串统计结果发送给同样的下游ReduceDriver。这就保证了下游Reducer一定不会出现统计出错。

public final int selectChannel(SerializationDelegate<T> record) {
   switch (strategy) {
   ...
   case PARTITION_HASH:
      return hashPartitionDefault(record.getInstance(),numberOfChannels);
   ...
   }
}

private int hashPartitionDefault(T record,int numberOfChannels) {
	int hash = this.comparator.hash(record);
	return MathUtils.murmurHash(hash) % numberOfChannels;
}

具体调用栈:

hash:50,TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hash:30,TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hashPartitionDefault:187,OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:147,OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:36,OutputEmitter (org.apache.flink.runtime.operators.shipping)
emit:60,ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65,OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35,CountingCollector (org.apache.flink.runtime.operators.util.metrics)
sortAndCombine:254,ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:266,ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:40,CountingCollector (org.apache.flink.runtime.operators.util.metrics)
close:88,ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
invoke:215,DataSourceTask (org.apache.flink.runtime.operators)
doRun:707,Task (org.apache.flink.runtime.taskmanager)
run:532,Task (org.apache.flink.runtime.taskmanager)
run:748,Thread (java.lang)

8.2 UnilateralSortMerger

这里是第二次排序

在 BatchTask中,会先Sort,Merge输入,然后才会交由Reduce来具体完成过。sort & merge操作具体是在UnilateralSortMerger类中完成的。

getIterator:646,UnilateralSortMerger (org.apache.flink.runtime.operators.sort)
getInput:1110,BatchTask (org.apache.flink.runtime.operators)
prepare:95,ReduceDriver (org.apache.flink.runtime.operators)
run:474,BatchTask (org.apache.flink.runtime.operators)
invoke:369,BatchTask (org.apache.flink.runtime.operators)
doRun:707,Thread (java.lang)

UnilateralSortMerger是一个full fledged sorter,它实现了一个多路merge sort。其内部的逻辑被划分到三个线程上(read,sort,spill),这三个线程彼此之间通过一系列blocking queues来构成了一个闭环。

其内存通过MemoryManager分配,所以这个组件不会超过给其分配的内存。

该类主要变量摘录如下:

public class UnilateralSortMerger<E> implements Sorter<E> {
	// ------------------------------------------------------------------------
	//                                  Threads
	// ------------------------------------------------------------------------

	/** The thread that reads the input channels into buffers and passes them on to the merger. */
	private final ThreadBase<E> readThread;

	/** The thread that merges the buffer handed from the reading thread. */
	private final ThreadBase<E> sortThread;

	/** The thread that handles spilling to secondary storage. */
	private final ThreadBase<E> spillThread;
	
	// ------------------------------------------------------------------------
	//                                   Memory
	// ------------------------------------------------------------------------
	
	/** The memory segments used first for sorting and later for reading/pre-fetching
	 * during the external merge. */
	protected final List<MemorySegment> sortReadMemory;
	
	/** The memory segments used to stage data to be written. */
	protected final List<MemorySegment> writeMemory;
	
	/** The memory manager through which memory is allocated and released. */
	protected final MemoryManager memoryManager;
	
	// ------------------------------------------------------------------------
	//                            Miscellaneous Fields
	// ------------------------------------------------------------------------
	/**
	 * Collection of all currently open channels,to be closed and deleted during cleanup.
	 */
	private final HashSet<FileIOChannel> openChannels;
	
	/**
	 * The monitor which guards the iterator field.
	 */
	protected final Object iteratorLock = new Object();
	
	/**
	 * The iterator to be returned by the sort-merger. This variable is null,while receiving and merging is still in
	 * progress and it will be set once we have &lt; merge factor sorted sub-streams that will then be streamed sorted.
	 */
	protected volatile MutableObjectIterator<E> iterator; 	// 如果大家经常调试,就会发现driver中的input都是这个兄弟。

	private final Collection<InMemorySorter<?>> inMemorySorters;
}

8.2.1 三种线程

ReadingThread:这种线程持续读取输入,然后把数据放入到一个待排序的buffer中。The thread that consumes the input data and puts it into a buffer that will be sorted.

SortingThread : 这种线程对于上游填充好的buffer进行排序。The thread that sorts filled buffers.

SpillingThread:这种线程进行归并操作。The thread that handles the spilling of intermediate results and sets up the merging. It also merges the channels until sufficiently few channels remain to perform the final streamed merge.

8.2.2 MutableObjectIterator

UnilateralSortMerger有一个特殊变量:

protected volatile MutableObjectIterator<E> iterator;

这个变量就是最终sort-merger的输出。如果大家调试过算子,就会发现这个变量就是具体算子的输入input类型。最终算子的输入就是来自于此。

8.3 ReduceDriver

这里是第三次排序,我们可以看出来reduce是怎么和groupby一起运作的。

  1. 针对 .groupBy(0),ReduceDriver就是单纯获取输入的第一个数值 T value = input.next();
  2. 后续代码中有嵌套的两个while,分别是 :遍历各种key,以及某一key中reduce。
  3. 遍历 group keys的时候,把value赋于比较算子comparator(这个算子概念不是Flink算子,就是为了说明逻辑概念) comparator.setReference(value); 因为groubBy只是指定按照第一个位置比较,没有指定具体key数值,所以这个value就是key了。此处记为while (1) ,代码中有注解。
  4. 从输入中读取后续的数值value,如果下一个数值是同一个key,就reduce;如果下一个数值不是同一个key,就跳出循环。放弃比较,把reduce结果输出。此处记为 while (2)
  5. 跳出 while (2) 之后,代码依然在 while (1) ,此时value是新值,所以继续在 while (1)中运行 。把value继续赋于比较算子 comparator.setReference(value);,于是进行新的key比较
public class ReduceDriver<T> implements Driver<ReduceFunction<T>,T> {
	@Override
	public void run() throws Exception {

		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();

		// cache references on the stack
		final MutableObjectIterator<T> input = this.input;
		final TypeSerializer<T> serializer = this.serializer;
		final TypeComparator<T> comparator = this.comparator;		
		final ReduceFunction<T> function = this.taskContext.getStub();		
		final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(),numRecordsOut);

		if (objectReuseEnabled) {
      ......
		} else {
      // 针对 `.groupBy(0)`,ReduceDriver就是单纯获取输入的第一个数值 `T value = input.next();`
			T value = input.next();

      // while (1)
			// iterate over key groups
			while (this.running && value != null) {
				numRecordsIn.inc();
        // 把value赋于比较算子,这个value就是key了。
				comparator.setReference(value);
				T res = value;

        // while (2)
				// iterate within a key group,循环比较这个key
				while ((value = input.next()) != null) {
					numRecordsIn.inc();
					if (comparator.equalToReference(value)) {
						// same group,reduce,如果下一个数值是同一个key,就reduce
						res = function.reduce(res,value);
					} else {
						// new key group,如果下一个数值不是同一个key,就跳出循环,放弃比较。
						break;
					}
				}
        // 把reduce结果输出
				output.collect(res);
			}
		}
	}  
}

0x09 参考

mapreduce里的shuffle 里的 sort merge 和combine

实战录 | Hadoop Mapreduce shuffle之Combine探讨

Hadoop中MapReduce中combine、partition、shuffle的作用是什么?在程序中怎么运用?

Flink运行时之生成作业图

mapreduce过程

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。