[源码解析]为什么mapPartition比map更高效

[源码解析]为什么mapPartition比map更高效

0x00 摘要

自从函数式编程和响应式编程逐渐进入到程序员的生活之后,map函数作为其中一个重要算子也为大家所熟知,无论是前端web开发,手机开发还是后端服务器开发,都很难逃过它的手心。而在大数据领域中又往往可以见到另外一个算子mapPartition的身影。在性能调优中,经常会被建议尽量用 mappartition 操作去替代 map 操作。本文将从Flink源码和示例入手,为大家解析为什么mapPartition比map更高效。

0x01 map vs mapPartition

1.1 map

Map的作用是将数据流上每个元素转换为另外的元素,比如data.map { x => x.toInt }。它把数组流中的每一个值,使用所提供的函数执行一遍,一一对应。得到与元素个数相同的数组流。然后返回这个新数据流。

1.2 mapPartition

MapPartition的作用是单个函数调用并行分区,比如data.mapPartition { in => in map { (_,1) } }。该函数将分区作为“迭代器”,可以产生任意数量的结果。每个分区中的元素数量取决于并行度和以前的operations。

1.3 异同

其实,两者完成的业务操作是一样的,本质上都是将数据流上每个元素转换为另外的元素。

区别主要在两点。

从逻辑实现来讲

  • map逻辑实现简单,就是在函数中简单一一转换,map函数的输入和输入都是单个元素。
  • mapPartition相对复杂,函数的输入有两个,一般格式为 void mapPartition(Iterable<T> values,Collector<O> out) 。其中values是需要映射转换的所有记录,out是用来发送结果的collector。具体返回什么,如何操作out来返回结果,则完全依赖于业务逻辑。

从调用次数来说

  • 数据有多少个元素,map就会被调用多少次。
  • 数据有多少分区,mapPartition就会被调用多少次。

为什么MapPartition有这么高效呢,下面我们将具体论证。

0x02 代码

首先我们给出示例代码,从下文中我们可以看出,map就是简单的转换,而mapPartition则不但要做转换,程序员还需要手动操作如何返回结果:

public class IteratePi {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        //迭代次数
        int iterativeNum=10;
        DataSet<Integer> wordList = env.fromElements(1,2,3);
      
        IterativeDataSet<Integer> iterativeDataSet=wordList.iterate(iterativeNum);
        DataSet<Integer> mapResult=iterativeDataSet
          			.map(new MapFunction<Integer,Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                value += 1;
                return value;
            }
        });
        //迭代结束的条件
        DataSet<Integer> result=iterativeDataSet.closeWith(mapResult);
        result.print();

        MapPartitionOperator<Integer,Integer> mapPartitionResult = iterativeDataSet
                .mapPartition(new MapPartitionFunction<Integer,Integer>() {
            @Override
            public void mapPartition(Iterable<Integer> values,Collector<Integer> out) {
                for (Integer value : values) {
                    // 这里需要程序员自行决定如何返回,即调用collect操作。
                    out.collect(value + 2);
                }
            }                                                                                                                           					}
        );
        //迭代结束的条件
        DataSet<Integer> partitionResult=iterativeDataSet.closeWith(mapPartitionResult);
        partitionResult.print();
    }
}

0x03 Flink的传输机制

世界上很少有没有来由的爱,也少见免费的午餐。mapPartition之所以高效,其所依赖的基础就是Flink的传输机制。所以我们下面就讲解下为什么。

大家都知道,Spark是用微批处理来模拟流处理,就是说,spark还是一批一批的传输和处理数据,所以我们就能理解mapPartition的机制就是基于这一批数据做统一处理。这样确实可以高效。

但是Flink号称是纯流,即Flink是每来一个输入record,就进行一次业务处理,然后返回给下游算子。

有的兄弟就会产生疑问:每次都只是处理单个记录,怎么能够让mapPartition做到批次处理呢。其实这就是Flink的微妙之处:即Flink确实是每次都处理一个输入record,但是在上下游传输时候,Flink还是把records累积起来做批量传输的。也可以这么理解:从传输的角度讲,Flink是微批处理的

3.1 传输机制概述

Flink 的网络栈是组成 flink-runtime 模块的核心组件之一,也是 Flink 作业的核心部分。所有来自 TaskManager 的工作单元(子任务)都通过它来互相连接。流式传输数据流都要经过网络栈,所以它对 Flink 作业的性能表现(包括吞吐量和延迟指标)至关重要。与通过 Akka 使用 RPC 的 TaskManager 和 JobManager 之间的协调通道相比,TaskManager 之间的网络栈依赖的是更底层的,基于 Netty 的 API。

3.2 远程通信

一个运行的application的tasks在持续交换数据。TaskManager负责做数据传输。不同任务之间的每个(远程)网络连接将在 Flink 的网络栈中获得自己的 TCP 通道。但是如果同一任务的不同子任务被安排到了同一个 TaskManager,则它们与同一个 TaskManager 的网络连接将被多路复用,并共享一个 TCP 信道以减少资源占用。

每个TaskManager有一组网络缓冲池(默认每个buffer是32KB),用于发送与接受数据。如发送端和接收端位于不同的TaskManager进程中,则它们需要通过操作系统的网络栈进行交流。流应用需要以管道的模式进行数据交换,也就是说,每对TaskManager会维持一个永久的TCP连接用于做数据交换。在shuffle连接模式下(多个sender与多个receiver),每个sender task需要向每个receiver task发送数据,此时TaskManager需要为每个receiver task都分配一个缓冲区。

一个记录被创建并传递之后(例如通过 Collector.collect()),它会被递交到RecordWriter,其将来自 Java 对象的记录序列化为一个字节序列,后者最终成为网络缓存。RecordWriter 首先使用SpanningRecordSerializer将记录序列化为一个灵活的堆上字节数组。然后它尝试将这些字节写入目标网络通道的关联网络缓存。

因为如果逐个发送会降低每个记录的开销并带来更高的吞吐量,所以为了取得高吞吐量,TaskManager的网络组件首先从缓冲buffer中收集records,然后再发送。也就是说,records并不是一个接一个的发送,而是先放入缓冲,然后再以batch的形式发送。这个技术可以高效使用网络资源,并达到高吞吐。类似于网络或磁盘 I/O 协议中使用的缓冲技术。

接收方网络栈(netty)将接收到的缓存写入适当的输入通道。最后(流式)任务的线程从这些队列中读取并尝试在RecordReader的帮助下,通过Deserializer将积累的数据反序列化为 Java 对象。

3.3 TaskManager进程内传输

若sender与receiver任务都运行在同一个TaskManager进程,则sender任务会将发送的条目做序列化,并存入一个字节缓冲。然后将缓冲放入一个队列,直到队列被填满。

Receiver任务从队列中获取缓冲,并反序列化输入的条目。所以,在同一个TaskManager内,任务之间的数据传输并不经过网络交互。

在同一个TaskManager进程内,也是批量传输

3.4 源码分析

我们基于Flink优化的结果进行分析验证,看看Flink是不是把记录写入到buffer中,这种情况下运行的是CountingCollector和ChainedMapDriver。

copyFromSerializerToTargetChannel:153,RecordWriter (org.apache.flink.runtime.io.network.api.writer)
emit:116,RecordWriter (org.apache.flink.runtime.io.network.api.writer)
emit:60,ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65,OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35,CountingCollector (org.apache.flink.runtime.operators.util.metrics)
collect:79,ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35,CountingCollector (org.apache.flink.runtime.operators.util.metrics)
invoke:196,DataSourceTask (org.apache.flink.runtime.operators)
doRun:707,Task (org.apache.flink.runtime.taskmanager)
run:532,Task (org.apache.flink.runtime.taskmanager)
run:748,Thread (java.lang)

当执行完用户定义的map函数之后,系统运行在 ChainedMapDriver.collect 函数。

public void collect(IT record) {
    this.outputCollector.collect(this.mapper.map(record));// mapper就是用户代码
}

然后调用到了CountingCollector.collect

public void collect(OUT record) {
		this.collector.collect(record);// record就是用户转换后的记录
}

OutputCollector.collect函数会把记录发送给所有的writers。

this.delegate.setInstance(record);// 先把record设置到SerializationDelegate中
for (RecordWriter<SerializationDelegate<T>> writer : writers) {  // 所有的writer
   writer.emit(this.delegate); // 发送record
}

RecordWriter负责把数据序列化,然后写入到缓存中。它有两个实现类:

  • BroadcastRecordWriter: 维护了多个下游channel,发送数据到下游所有的channel中。
  • ChannelSelectorRecordWriter: 通过channelSelector对象判断数据需要发往下游的哪个channel。我们用的正是这个RecordWriter

这里我们分析下ChannelSelectorRecordWriteremit方法:

public void emit(T record) throws IOException,InterruptedException {
   emit(record,channelSelector.selectChannel(record));
}

这里使用了channelSelector.selectChannel方法。该方法为record寻找到对应下游channel id。

public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> {
	public final int selectChannel(SerializationDelegate<T> record) {
		switch (strategy) {
		case FORWARD:
			return forward(); // 我们代码用到了这种情况。这里 return 0;
    ......
		}
	}
}

接下来我们又回到了父类RecordWriter.emit

protected void emit(T record,int targetChannel) throws IOException,InterruptedException {
   serializer.serializeRecord(record);
   // Make sure we don't hold onto the large intermediate serialization buffer for too long
   if (copyFromSerializerToTargetChannel(targetChannel)) {
      serializer.prune();
   }
}

关键的逻辑在于copyFromSerializerToTargetChannel此方法从序列化器中复制数据到目标channel,我们可以看出来,每条记录都是写入到buffer中

protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException,InterruptedException {
   // We should reset the initial position of the intermediate serialization buffer before
   // copying,so the serialization results can be copied to multiple target buffers.
   // 此处Serializer为SpanningRecordSerializer
   // reset方法将serializer内部的databuffer position重置为0
   serializer.reset();

   boolean pruneTriggered = false;
    // 获取目标channel的bufferBuilder
    // bufferBuilder内维护了MemorySegment,即内存片段
    // Flink的内存管理依赖MemorySegment,可实现堆内堆外内存的管理
    // RecordWriter内有一个bufferBuilder数组,长度和下游channel数目相同
    // 该数组以channel ID为下标,存储和channel对应的bufferBuilder
    // 如果对应channel的bufferBuilder尚未创建,调用requestNewBufferBuilder申请一个新的bufferBuilder  
   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
    // 复制serializer的数据到bufferBuilder中
   SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
    // 循环直到result完全被写入到buffer
    // 一条数据可能会被写入到多个缓存中
    // 如果缓存不够用,会申请新的缓存
    // 数据完全写入完毕之时,当前正在操作的缓存是没有写满的
    // 因此返回true,表明需要压缩该buffer的空间  
   while (result.isFullBuffer()) {
      finishBufferBuilder(bufferBuilder);

      // If this was a full record,we are done. Not breaking out of the loop at this point
      // will lead to another buffer request before breaking out (that would not be a
      // problem per se,but it can lead to stalls in the pipeline).
      if (result.isFullRecord()) {
         pruneTriggered = true;
         emptyCurrentBufferBuilder(targetChannel);
         break;
      }

      bufferBuilder = requestNewBufferBuilder(targetChannel);
      result = serializer.copyToBufferBuilder(bufferBuilder);
   }
   checkState(!serializer.hasSerializedData(),"All data should be written at once");

   // 如果buffer超时时间为0,需要flush目标channel的数据
   if (flushAlways) {
      flushTargetPartition(targetChannel);
   }
   return pruneTriggered;
}

0x04 runtime

4.1 Driver

Driver是Flink runtime的一个重要概念,是在一个task中运行的用户业务逻辑组件,具体实现了批量操作代码。其内部API包括初始化,清除,运行,取消等逻辑。

public interface Driver<S extends Function,OT> {
   ......
   void setup(TaskContext<S,OT> context);
   void run() throws Exception;
   void cleanup() throws Exception;
   void cancel() throws Exception;
}

具体在 org.apache.flink.runtime.operators 目录下,我们能够看到各种Driver的实现,基本的算子都有自己的Driver。

......
CoGroupDriver.java
FlatMapDriver.java
FullOuterJoinDriver.java
GroupReduceCombineDriver.java
GroupReduceDriver.java
JoinDriver.java
LeftOuterJoinDriver.java
MapDriver.java
MapPartitionDriver.java
......

4.2 MapDriver

map算子对应的就是MapDriver。

结合上节我们知道,上游数据是通过batch方式批量传入的。所以,在run函数会遍历输入,每次取出一个record,然后调用用户自定义函数function.map对这个record做map操作。

public class MapDriver<IT,OT> implements Driver<MapFunction<IT,OT>,OT> {

   @Override
   public void run() throws Exception {
      final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
      .....
      else {
         IT record = null;
        
         // runtime主动进行循环,这样导致大量函数调用
         while (this.running && ((record = input.next()) != null)) {
            numRecordsIn.inc();
            output.collect(function.map(record)); // function是用户函数
         }
      }
   }
}

4.3 MapPartitionDriver

MapPartitionDriver是mapPartition的具体组件。系统会把得到的批量数据inIter一次性的都传给用户自定义函数,由用户代码来进行遍历操作

public class MapPartitionDriver<IT,OT> implements Driver<MapPartitionFunction<IT,OT> {
   @Override
   public void run() throws Exception {
     
		final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0),numRecordsIn);     
      ......
      } else {
         final NonReusingMutableToRegularIteratorWrapper<IT> inIter = new NonReusingMutableToRegularIteratorWrapper<IT>(input,this.taskContext.<IT>getInputSerializer(0).getSerializer());

         // runtime不参与循环,这样可以减少函数调用
         function.mapPartition(inIter,output);
      }
   }
}

4.4 效率区别

我们能够看到map和mapPartition的input都是MutableObjectIterator input类型,说明两者的输入一致。只不过map是在Driver代码中进行循环,mapPartition在用户代码中进行循环。具体mapPartition的 效率提高体现在如下方面 :

  1. 假设一共有60个数据需要转换,map会在runtime中调用用户函数60次。
  2. runtime把数据分成6个partition操作,则mapPartition在runtime中会调用用户函数6次,在每个用户函数中分别循环10次。对于runtime来说,map操作会多出54次用户函数调用。
  3. 如果用户业务中需要频繁创建额外的对象或者外部资源操作,mapPartition的优势更可以体现。 例如将数据写入Mysql,那么map需要为每个元素创建一个数据库连接,而mapPartition为每个partition创建一个链接。

假设有上亿个数据需要map,这资源占用和运行速度效率差别会相当大。

0x05 优化和ChainedMapDriver

之前提到了优化,这里我们再详细深入下如何优化map算子。

Flink有一个关键的优化技术称为任务链,用于(在某些情况下)减少本地通信的过载。为了满足任务链的条件,至少两个以上的operator必须配置为同一并行度,并且使用本地向前的(local forwad)方式连接。任务链可以被认为是一种管道。

当管道以任务链的方式执行时候,Operators的函数被融合成单个任务,并由一个单独的线程执行。一个function产生的records,通过使用一个简单的方法调用,被递交给下一个function。所以这里在方法之间的records传递中,基本没有序列化以及通信消耗

针对优化后的Operator Chain,runtime对应的Driver则是ChainedMapDriver。这是通过 MAP(MapDriver.class,ChainedMapDriver.class,PIPELINED,0),映射得到的。

我们可以看到,因为是任务链,所以每个record是直接在管道中流淌 ,ChainedMapDriver连循环都省略了,直接map转换后丢给下游去也

public class ChainedMapDriver<IT,OT> extends ChainedDriver<IT,OT> {

   private MapFunction<IT,OT> mapper; // 用户函数

   @Override
   public void collect(IT record) {
      try {
         this.numRecordsIn.inc();
         this.outputCollector.collect(this.mapper.map(record));
      } catch (Exception ex) {
         throw new ExceptionInChainedStubException(this.taskName,ex);
      }
   }
}

// 这时的调用栈如下
map:23,UserFunc$1 (com.alibaba.alink)
collect:79,Thread (java.lang)

0x06 总结

map和mapPartition实现的基础是Flink的数据传输机制 :Flink确实是每次都处理一个输入record,但是在上下游之间传输时候,Flink还是把records累积起来做批量传输。即可以认为从数据传输模型角度讲,Flink是微批次的。

对于数据流转换,因为是批量传输,所以对于积累的records,map是在runtime Driver代码中进行循环,mapPartition在用户代码中进行循环。

map的函数调用次数要远高于mapPartition。如果在用户函数中涉及到频繁创建额外的对象或者外部资源操作,则mapPartition性能远远高出。

如果没有connection之类的操作,则通常性能差别并不大,通常不会成为瓶颈,也没有想象的那么严重。

0x07 参考

深入了解 Flink 网络栈 ——A Deep-Dive into Flink's Network Stack

Flink架构(二)- Flink中的数据传输

Flink 源码之节点间通信

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。