[源码分析] 从FlatMap用法到Flink的内部实现

[源码分析] 从FlatMap用法到Flink的内部实现

0x00 摘要

本文将从FlatMap概念和如何使用开始入手,深入到Flink是如何实现FlatMap。希望能让大家对这个概念有更深入的理解。

0x01 Map vs FlatMap

首先我们先从概念入手。

自从响应式编程慢慢壮大以来,这两个单词现在越来越被大家熟悉了。前端能见到它们的身影,后台也能见到;安卓里面有,iOS也有。很多兄弟刚遇到它们时候是懵圈的,搞不清楚之间的区别。下面我就给大家简单讲解下。

map

它把数组流中的每一个值,使用所提供的函数执行一遍,一一对应。得到与元素个数相同的数组流。然后返回这个新数据流。

flatMap

flat是扁平的意思。所以这个操作是:先映射(map),再拍扁(join)。

flatMap输入可能是多个子数组流。所以flatMap先针对 每个子数组流的每个元素进行映射操作。然后进行扁平化处理,最后汇集所有进行扁平化处理的结果集形成一个新的列表(扁平化简而言之就是去除所有的修饰)。

flatMap与map另外一个不一样的地方就是传入的函数在处理完后返回值必须是List。

实例

比如拿到一个文本文件之后,我们是按行读取,按行处理。如果要对每一行的单词数进行计数,那么应该选择Map方法,如果是统计词频,就应该选择flatMap方法。

如果还不清楚,可以看看下面这个例子:

梁山新进一批好马,准备给每个马军头领配置一批。于是得到函数以及头领名单如下:

函数 = ( 头领 => 头领 + 好马 )
五虎将 = List(关胜、林冲、秦明、呼延灼、董平 )
八骠骑 = List(花荣、徐宁、杨志、索超、张清、朱仝、史进、穆弘 )

// Map函数的例子
利用map函数,我们可以得到 五虎将马军

五虎将马军 = 五虎将.map( 头领 => 头领 + 好马 )
结果是 List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马 )

// flatMap函数的例子
但是为了得到统一的马军,则可以用flatMap:

马军头领 = List(五虎将,八骠骑)
马军 = 马军头领.flatMap( 头领 => 头领 + 好马 ) 

结果就是:List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马,花荣 + 马、徐宁 + 马、杨志 + 马、索超 + 马、张清 + 马、朱仝 + 马、史进 + 马、穆弘 + 马 )

现在大家应该清楚了吧。接下来看看几个FlatMap的实例。

Scala语言的实现

Scala本身对于List类型就有map和flatMap操作。举例如下:

val names = List("Alice","James","Apple")
val strings = names.map(x => x.toUpperCase)
println(strings)
// 输出 List(ALICE,JAMES,APPLE)

val chars = names.flatMap(x=> x.toUpperCase())
println(chars)
// 输出 List(A,L,I,C,E,J,A,M,S,P,E)

Flink的例子

以上是scala语言层面的实现。下面我们看看Flink框架是如何使用FlatMap的。

网上常见的一个Flink应用的例子:

//加载数据源
val source = env.fromElements("china is the best country","beijing is the capital of china")

//转化处理数据
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)

Flink源码中的例子

case class WordWithCount(word: String,count: Long)

val text = env.socketTextStream(host,port,'\n')

val windowCounts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w,1) }
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .sum("count")

windowCounts.print()

上面提到的都是简单的使用,如果有复杂需求,在Flink中,我们可以通过继承FlatMapFunction和RichFlatMapFunction来自定义算子。

函数类FlatMapFunction

对于不涉及到状态的使用,可以直接继承 FlatMapFunction,其定义如下:

@Public
@FunctionalInterface
public interface FlatMapFunction<T,O> extends Function,Serializable {
	void flatMap(T value,Collector<O> out) throws Exception;
}

如何自定义算子呢,这个可以直接看看Flink中的官方例子

// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer implements FlatMapFunction<String,String> {
  @Override
  public void flatMap(String value,Collector<String> out) {
    for (String token : value.split("\\W")) {
      out.collect(token);
    }
  }
}

// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());

Rich函数类RichFlatMapFunction

对于涉及到状态的情况,用户可以使用继承 RichFlatMapFunction 类的方式来实现UDF。

RichFlatMapFunction属于Flink的Rich函数类。从名称上来看,这种函数类在普通的函数类上增加了Rich前缀,比如RichMapFunctionRichFlatMapFunctionRichReduceFunction等等。比起普通的函数类,Rich函数类增加了:

  • open()方法:Flink在算子调用前会执行这个方法,可以用来进行一些初始化工作。
  • close()方法:Flink在算子最后一次调用结束后执行这个方法,可以用来释放一些资源。
  • getRuntimeContext方法:获取运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子运行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据

FlatMap对应的RichFlatMapFunction如下:

@Public
public abstract class RichFlatMapFunction<IN,OUT> extends AbstractRichFunction implements FlatMapFunction<IN,OUT> {
	@Override
	public abstract void flatMap(IN value,Collector<OUT> out) throws Exception;
}

其基类 AbstractRichFunction 如下,可以看到主要是和运行时上下文建立了联系,并且有初始化和退出操作

@Public
public abstract class AbstractRichFunction implements RichFunction,Serializable {
  
	private transient RuntimeContext runtimeContext;

	@Override
	public void setRuntimeContext(RuntimeContext t) {
		this.runtimeContext = t;
	}

	@Override
	public RuntimeContext getRuntimeContext() {
			return this.runtimeContext;
	}

	@Override
	public IterationRuntimeContext getIterationRuntimeContext() {
    if (this.runtimeContext instanceof IterationRuntimeContext) {
			return (IterationRuntimeContext) this.runtimeContext;
		} 
	}

	@Override
	public void open(Configuration parameters) throws Exception {}

	@Override
	public void close() throws Exception {}
}

如何最好的使用? 当然还是官方文档和例子最靠谱。

因为涉及到状态,所以如果使用,你必须创建一个 StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称(你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们),状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。 根据不同的状态类型,可以创建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。 但是我们也会看到一个例子。RichFunctionRuntimeContext 提供如下方法:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

下面是一个 FlatMapFunction 的例子,展示了如何将这些部分组合起来:

class CountWindowAverage extends RichFlatMapFunction[(Long,Long),(Long,Long)] {

  private var sum: ValueState[(Long,Long)] = _

  override def flatMap(input: (Long,out: Collector[(Long,Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before,it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L,0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1,currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2,emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1,newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long,Long)]("average",createTypeInformation[(Long,Long)])
    )
  }
}

object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L,3L),(1L,5L),7L),4L),2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

这个例子实现了一个简单的计数窗口。 我们把元组的第一个元素当作 key(在示例中都 key 都是 “1”)。 该函数将出现的次数以及总和存储在 “ValueState” 中。 一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。 请注意,我们会为每个不同的 key(元组中第一个元素)保存一个单独的值。

0x03 从Flink源码入手看FlatMap实现

FlatMap从Flink编程模型角度讲属于一个算子,用来对数据流或者数据集进行转换。从框架角度说,FlatMap是怎么实现的呢? 或者说FlatMap是怎么从用户代码转换到Flink运行时呢 ?

1. DataSet

首先说说 DataSet相关这套系统中FlatMap的实现。

请注意,DataSteam对应的那套系统中,operator名字都是带着Stream的,比如StreamOperator。

DataSet

val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) 这段代码调用的就是DataSet中的API。具体如下:

public abstract class DataSet<T> {
  
	public <R> FlatMapOperator<T,R> flatMap(FlatMapFunction<T,R> flatMapper) {
    
		String callLocation = Utils.getCallLocationName();
    
		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper,getType(),callLocation,true);
		return new FlatMapOperator<>(this,resultType,clean(flatMapper),callLocation);
	}
}

FlatMapOperator

可以看出,flatMap @ DataSet 主要就是生成了一个FlatMapOperator,这个可以理解为是逻辑算子。其定义如下:

public class FlatMapOperator<IN,OUT> extends SingleInputUdfOperator<IN,OUT,FlatMapOperator<IN,OUT>> {

	protected final FlatMapFunction<IN,OUT> function;
	protected final String defaultName;

	public FlatMapOperator(DataSet<IN> input,TypeInformation<OUT> resultType,FlatMapFunction<IN,OUT> function,String defaultName) {
		super(input,resultType);
		this.function = function;
		this.defaultName = defaultName;
	}

	@Override
	protected FlatMapFunction<IN,OUT> getFunction() {
		return function;
	}

  // 这个translateToDataFlow就是生成计划(Plan)的关键代码
	@Override
	protected FlatMapOperatorBase<IN,OUT>> translateToDataFlow(Operator<IN> input) {
		String name = getName() != null ? getName() : "FlatMap at " + defaultName;
		// create operator
		FlatMapOperatorBase<IN,OUT>> po = new FlatMapOperatorBase<IN,OUT>>(function,new UnaryOperatorInformation<IN,OUT>(getInputType(),getResultType()),name);
		// set input
		po.setInput(input);
		// set parallelism
		if (this.getParallelism() > 0) {
			// use specified parallelism
			po.setParallelism(this.getParallelism());
		} else {
			// if no parallelism has been specified,use parallelism of input operator to enable chaining
			po.setParallelism(input.getParallelism());
		}
		return po;
	}
}

FlatMapOperator的基类如下:

public abstract class SingleInputUdfOperator<IN,O extends SingleInputUdfOperator<IN,O>> extends SingleInputOperator<IN,O> implements UdfOperator<O> {

}

// Base class for operations that operates on a single input data set.
public abstract class SingleInputOperator<IN,O extends SingleInputOperator<IN,O>> extends Operator<OUT,O> {
  	private final DataSet<IN> input;
}

生成计划

DataSet API所编写的批处理程序跟DataStream API所编写的流处理程序在生成作业图(JobGraph)之前的实现差别很大。流处理程序是生成流图(StreamGraph),而批处理程序是生成计划(Plan)并由优化器对其进行优化并生成优化后的计划(OptimizedPlan)。

计划(Plan)以数据流(dataflow)的形式来表示批处理程序,但它只是批处理程序最初的表示,在一个批处理程序生成作业图之前,计划还会被进行优化以产生更高效的方案。Plan不同于流图(StreamGraph),它以sink为入口,因为一个批处理程序可能存在若干个sink,所以Plan采用集合来存储它。另外Plan还封装了批处理作业的一些基本属性:jobId、jobName以及defaultParallelism等。

生成Plan的核心部件是算子翻译器(OperatorTranslation),createProgramPlan方法通过它来”翻译“出计划,核心代码如下

public class OperatorTranslation {
  
   // 接收每个需遍历的DataSink对象,然后将其转换成GenericDataSinkBase对象
   public Plan translateToPlan(List<DataSink<?>> sinks,String jobName) {
       List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
       //遍历sinks集合
       for (DataSink<?> sink : sinks) {
            //将翻译生成的GenericDataSinkBase加入planSinks集合*,对每个sink进行”翻译“
            planSinks.add(translate(sink));
        }
       //以planSins集合构建Plan对象
       Plan p = new Plan(planSinks);
       p.setJobName(jobName);
       return p;
    }

	private <I,O> org.apache.flink.api.common.operators.Operator<O>    translateSingleInputOperator(SingleInputOperator<?,?,?> op) {
    //会调用到 FlatMapOperator 的 translateToDataFlow
	org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);    
  }
  
}

FlatMapOperatorBase就是生成的plan中的一员。

public class FlatMapOperatorBase<IN,FT extends FlatMapFunction<IN,OUT>> extends SingleInputOperator<IN,FT> {
	@Override
	protected List<OUT> executeOnCollections(List<IN> input,RuntimeContext ctx,ExecutionConfig executionConfig) throws Exception {
		FlatMapFunction<IN,OUT> function = userFunction.getUserCodeObject();
		
		FunctionUtils.setFunctionRuntimeContext(function,ctx);
		FunctionUtils.openFunction(function,parameters);

		ArrayList<OUT> result = new ArrayList<OUT>(input.size());

		TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);

		CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result,outSerializer);

		for (IN element : input) {
			IN inCopy = inSerializer.copy(element);
			function.flatMap(inCopy,resultCollector);
		}

		FunctionUtils.closeFunction(function);

		return result;
	}
}

而最后优化时候,则FlatMapOperatorBase会被优化成FlatMapNode。

public class GraphCreatingVisitor implements Visitor<Operator<?>> {
	public boolean preVisit(Operator<?> c) {
    else if (c instanceof FlatMapOperatorBase) {
			n = new FlatMapNode((FlatMapOperatorBase<?,?>) c);
		}
  }
}

自此,FlatMap就被组合到 DataSet的 OptimizedPlan 中。下一步Flink会依据OptimizedPlan来生成 JobGraph。

作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。至此就完成了从用户业务代码到Flink运行系统的转化。

在运行状态下,如果上游有数据流入,则FlatMap这个算子就会发挥作用。

2. DataStream

对于DataStream,则是另外一套体系结构。首先我们找一个使用DataStream的例子看看。

//获取数据: 从socket中获取
val textDataStream = env.socketTextStream("127.0.0.1",8888,'\n')
val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1))

//groupby: 按照指定的字段聚合
val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1))
windowDstram.sum("count").print()

上面例子中,flatMap 调用的是DataStream中的API,具体如下:

public class DataStream<T> {
  
	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T,R> flatMapper) {
    //clean函数用来移除FlatMapFunction类对象的外部类部分,这样就可以进行序列化
    //getType用来获取类对象的输出类型
		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),Utils.getCallLocationName(),true);
		return flatMap(flatMapper,outType);
	}
  
  // 构建了一个StreamFlatMap的Operator
	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T,R> flatMapper,TypeInformation<R> outputType) {
		return transform("Flat Map",outputType,new StreamFlatMap<>(clean(flatMapper)));
	}  
  
  // 依次调用下去
	@PublicEvolving
	public <R> SingleOutputStreamOperator<R> transform(
			String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T,R> operator) {
		return doTransform(operatorName,outTypeInfo,SimpleOperatorFactory.of(operator));
	}
  
	protected <R> SingleOutputStreamOperator<R> doTransform(
			String operatorName,StreamOperatorFactory<R> operatorFactory) {
		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();
    // 构建Transform对象,Transform对象中包含其上游Transform对象,这样上游下游就串成了一个Transform链。
		OneInputTransformation<T,R> resultTransform = new OneInputTransformation<>(
				this.transformation,operatorName,operatorFactory,environment.getParallelism());
		@SuppressWarnings({"unchecked","rawtypes"})
		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment,resultTransform);
    // 将这Transform对象放入env的transform对象列表。
		getExecutionEnvironment().addOperator(resultTransform);
    // 返回流
		return returnStream;
	}  
}

上面源码中的几个概念需要澄清。

Transformation:首先,FlatMap在FLink编程模型中是算子API,在DataStream中会生成一个Transformation,即逻辑算子。

逻辑算子Transformation最后会对应到物理算子Operator,这个概念对应的就是StreamOperator

StreamOperator:DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。

processElement()方法也是UDF的逻辑被调用的地方,例如FlatMapFunction里的flatMap()方法。

public class StreamFlatMap<IN,OUT>
		extends AbstractUdfStreamOperator<OUT,OUT>>
		implements OneInputStreamOperator<IN,OUT> {

	private transient TimestampedCollector<OUT> collector;

	public StreamFlatMap(FlatMapFunction<IN,OUT> flatMapper) {
		super(flatMapper);
		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();
		collector = new TimestampedCollector<>(output);
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		collector.setTimestamp(element);
    // 调用用户定义的FlatMap
		userFunction.flatMap(element.getValue(),collector);
	}
}

我们可以看到,StreamFlatMap继承了AbstractUdfStreamOperator,从而间接继承了StreamOperator。

public abstract class AbstractStreamOperator<OUT>
		implements StreamOperator<OUT>,SetupableStreamOperator<OUT>,Serializable {
}

StreamOperator是根接口。对于 Streaming 来说所有的算子都继承自 StreamOperator。继承了StreamOperator的扩展接口则有OneInputStreamOperator,TwoInputStreamOperator。实现了StreamOperator的抽象类有AbstractStreamOperator以及它的子类AbstractUdfStreamOperator。

从 API 到 逻辑算子 Transformation,再到 物理算子Operator,就生成了 StreamGraph。下一步Flink会依据StreamOperator来生成 JobGraph。

作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。至此就完成了从用户业务代码到Flink运行系统的转化。

0x04 参考

Flink中richfunction的一点小作用

【浅显易懂】scala中map与flatMap的区别

Working with State

flink简单应用: scala编写wordcount

【Flink】Flink基础之实现WordCount程序(Java与Scala版本)

Flink进阶教程:以flatMap为例,如何进行算子自定义

Flink运行时之批处理程序生成计划

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。