hadoop2.7之作业提交详解下 hadoop2.7之作业提交详解上

接着作业提交详解(上)继续写:在上一篇(hadoop2.7之作业提交详解(上))中已经讲到了YARNRunner.submitJob()

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]

那么现在接着从YARNRunner.submitJob()开始说:

先简单看一下YARNRunner这个类(摘录一部分):

package org.apache.hadoop.mapred;
public class YARNRunner implements ClientProtocol {
  private ResourceMgrDelegate resMgrDelegate; //这是RM派驻在“地方”上的特派员
  private ClientCache clientCache;
   Configuration conf;
  private final FileContext defaultFileContext;
  
  public YARNRunner(Configuration conf) {构造函数,需要创建特派员,然后调用下一个构造函数
   this(conf,new ResourceMgrDelegate(new YarnConfiguration(conf)));
  }

  public YARNRunner(Configuration conf,ResourceMgrDelegate resMgrDelegate) {需要创建ClientCache
    ClientCache(conf,resMgrDelegate));
  }

  public YARNRunner(Configuration conf,ResourceMgrDelegate resMgrDelegate,ClientCache clientCache) {这是最终的构造函数
    this.conf = conf;
    try {
      this.resMgrDelegate = resMgrDelegate;
      this.clientCache = clientCache;
      this.defaultFileContext = FileContext.getFileContext(this.conf);
    } catch (UnsupportedFileSystemException ufe) {
      throw new RuntimeException("Error in instantiating YarnClient",ufe);
    }
  }
 JobStatus submitJob(JobID jobId,String jobSubmitDir,Credentials ts)
throws IOException,InterruptedException {
  
  addHistoryToken(ts);用于为历史记录服务,与“作业历史(JobHistory)”有关
  
   Construct necessary information to start the MR AM
  构建MR AM的必要启动信息
  创建一个ApplicationSubmissionContext,并将conf中的相关信息转移过去
  ApplicationSubmissionContext appContext =
    createApplicationSubmissionContext(conf,jobSubmitDir,ts);

   Submit to ResourceManager
   {
 /* 将作业提交给资源管理者(ResourceManager)*/
RM受理了所提交的作业以后,会把这个ContainerLaunchContext转发到某个NM节点
上,在那里执行这个shell命令行,另起一个Java虚拟机,让它执行MRAppMaster.class。
由此可见,这个ApplicationSubmissionContext对象appContext,真的是“代表着ResourceManager
为发起该应用的ApplicationMaster所需的全部信息”
    ApplicationId applicationId =
        resMgrDelegate.submitApplication(appContext);

    ApplicationReport appMaster = resMgrDelegate
        .getApplicationReport(applicationId);
    String diagnostics =
        (appMaster == null ?
            "application report is null" : appMaster.getDiagnostics());
    if (appMaster == null
        || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
        || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
      new IOException("Failed to run job : " +
          diagnostics);
    }
    return clientCache.getClient(jobId).getJobStatus(jobId);
  }  (YarnException e) {
     IOException(e);
  }
}
}

其中createApplicationSubmissionContext方法的作用:
1、设置资源:默认内存为1536M,cpu的core为1
2、设置本地资源,比如临时工作目录,jar包等
3、设置安全票据tokens
4、设置启动AM的命令
5、检查map和reduce的配置信息
6、设置环境CLASSPATH等
7、为AM的container设置ContainerLaunchContext
8、设置ApplicationSubmissionContext
9、设置MRAppMaster的执行路径
并把配置块conf中当前的相关信息、已上传资料所在的目录路径以及有关身份和访问权限的信息都复制转移过去。提供了有关ApplicationMaster即“项目组长”该用哪一个Shell(例如bash)以及有关某些环境变量的信息。再如作业的名称等。

接下来就是调用ResourceMgrDelegate.submitApplication方法:(所以我们先看一下ResourceMgrDelegate这个类)

class ResourceMgrDelegate extends YarnClient {   
   YarnConfiguration conf;
   ApplicationSubmissionContext application;
   ApplicationId applicationId;
  protected YarnClient client;实际上是YarnClientImpl类的对象,那也是对YarnClient的继承和扩展
   Text rmDTService;
  这是ResourceMgrDelegate的构造方法
   ResourceMgrDelegate(YarnConfiguration conf) {
    super(ResourceMgrDelegate.class.getName());
     conf;
  创建YarnClient对象client
  YarnClient.createYarnClient()创建的是YarnClientImpl
    this.client = YarnClient.createYarnClient();
    init(conf);这是由AbstractService类提供的,YarnClient是对AbstractService的扩展
    start();这也是由AbstractService类提供的
  }
 ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
           YarnException,IOException {
     client.submitApplication(appContext);//调用YarnClientImpl.submitApplication方法
  }

从前面所有的代码中我们可以得知:

ResourceMgrDelegate对象是在YARNRunner的构造函数中创建的。而YARNRunner,则是在前面的Cluster.Initialize()中创建的。再往上追溯,则Cluster类对象是在首次调用connect()时创建的。所以,任何一个节点,只要曾经调用过connect(),即曾经与“集群”有过连接,节点上就会有个Cluster类对象,从而就会有个YARNRunner对象,也就会有个ResourceMgrDelegate对象,而且如下所述就会有个YarnClientImpl对象。

现在为止,我们的作业提交路径是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication()]

解下来我们继续看YarnClientImpl.submitApplication()方法:

 appContext.getApplicationId();
    if (applicationId == null) {
       ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    创建一个SubmitApplicationRequestPBImpl类的记录块
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.);
    request.setApplicationSubmissionContext(appContext);设置好记录块中的Context
     Automatically add the timeline DT into the CLC
     Only when the security and the timeline service are both enabled
    if (isSecurityEnabled() && timelineServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }

    TODO: YARN-1763:Handle RM failovers during the submitApplication call.
    rmClient.submitApplication(request);实际的跨节点提交

    int pollCount = 0;
    long startTime = System.currentTimeMillis();
    EnumSet<YarnApplicationState> waitingStates = 
                                 EnumSet.of(YarnApplicationState.NEW,YarnApplicationState.NEW_SAVING,YarnApplicationState.SUBMITTED);
    EnumSet<YarnApplicationState> failToSubmitStates = 
                                  EnumSet.of(YarnApplicationState.FAILED,YarnApplicationState.KILLED);        
    while (true {
    获取来自RM节点的应用状态报告,从中获取本应用的当前状态
        ApplicationReport appReport = getApplicationReport(applicationId);
        YarnApplicationState state = appReport.getYarnApplicationState();
        if (!waitingStates.contains(state)) {
          if(failToSubmitStates.contains(state)) {
            new YarnException("Failed to submit " + applicationId + 
                " to YARN : " + appReport.getDiagnostics());
          }
          LOG.info("Submitted application " + applicationId);
          break;作业已进入运行阶段,结束while循环
        }

        long elapsedMillis = System.currentTimeMillis() - startTime;
        if (enforceAsyncAPITimeout() &&
            elapsedMillis >= asyncApiPollTimeoutMillis) {
          new YarnException("Timed out while waiting for application " +
              applicationId + " to be submitted successfully");
        }

         Notify the client through the log every 10 poll,in case the client
         is blocked here too long.
        if (++pollCount % 10 == 0) {
          LOG.info("Application submission is not finished," +
              "submitted application " + applicationId +
              " is still in " + state);
        }
         {
          Thread.sleep(submitPollIntervalMillis);
        }  (InterruptedException ie) {
          String msg = "Interrupted while waiting for application "
              + applicationId + " to be successfully submitted.";
          LOG.error(msg);
           YarnException(msg,ie);
        }
      }  (ApplicationNotFoundException ex) {
         FailOver or RM restart happens before RMStateStore saves
         ApplicationState
        LOG.info("Re-submit application " + applicationId + "with the " +
            "same ApplicationSubmissionContext");
        rmClient.submitApplication(request);失败后的再次提交
      }
    }

     applicationId;
  }

从上看来只要是调用了rmClient.submitApplication(request)方法,那这儿rmClient又是个什么呢?我们接着来看一下YarnClientImpl这个类的简单定义:

class YarnClientImpl  YarnClient {

  static final Log LOG = LogFactory.getLog(YarnClientImpl.);

  protected ApplicationClientProtocol rmClient;
  protected long submitPollIntervalMillis;
   asyncApiPollIntervalMillis;
   asyncApiPollTimeoutMillis;
   AHSClient historyClient;
  boolean historyServiceEnabled;
   TimelineClient timelineClient;
  @VisibleForTesting
  Text timelineService;
  @VisibleForTesting
  String timelineDTRenewer;
   timelineServiceEnabled;
   timelineServiceBestEffort;

  final String ROOT = "root";

   YarnClientImpl() {
    super(YarnClientImpl..getName());
  }

从上可以看出rmClient是一个ApplicationClientProtocol对象,这个又是一个接口,具体的实现类是ApplicationClientProtocolPBClientImpl ,接下来我们看一下这个类:

class ApplicationClientProtocolPBClientImpl  ApplicationClientProtocol,Closeable {

   ApplicationClientProtocolPB proxy;

  public ApplicationClientProtocolPBClientImpl( clientVersion,InetSocketAddress addr,Configuration conf)  IOException {
将配置项“rpc.engine.ApplicationClientProtocolPB”设置成ProtobufRpcEngine 
    RPC.setProtocolEngine(conf,ApplicationClientProtocolPB.);
创建proxy
这个proxy存在于用户为提交运行具体应用而起的那个JVM上,它既不属于
ResourceManager,也不属于NodeManager,而是一个独立的Java虚拟机,可以是在集群内的任何一台机器上
    proxy = RPC.getProxy(ApplicationClientProtocolPB. SubmitApplicationResponse submitApplication(
    SubmitApplicationRequest request) 从请求request中取出其协议报文(message)部分 
  SubmitApplicationRequestProto requestProto =
      ((SubmitApplicationRequestPBImpl) request).getProto();
   {
交由proxy将报文发送出去,并等候服务端回应 
将服务端回应包装成SubmitApplicationResponsePBImpl对象 
    return new SubmitApplicationResponsePBImpl(proxy.submitApplication( (ServiceException e) {
    RPCUtil.unwrapAndThrowException(e);
    ;
  }
}
 }

ApplicationClientProtocolPBClientImpl的submitApplication方法,在其里面就是调用proxy.submitApplication方法,而proxy是在构造函数中创建的。

  通过proxy发出的SubmitApplicationRequest,是以RM节点为目标的,最终经由操作系统提供的网络传输层以TCP报文的方式送达RM所在节点机上的对等层,那上面是
ProtoBuf,它会从TCP报文中还原出对端所发送的对象。再往上,那就是同样也实现了ApplicationClientProtocolPB界面的ApplicationClientProtocolPBServiceImpl,ProtoBuf这一
层会根据对方请求直接就调用其submitApplication()。这样,Client一侧对于ApplicationClientProtocolPBClientImpl所提供函数的调用就转化成Server一侧对于applicationClientProtocolPBServiceImpl所提供的对应函数的调用。当然,Server一侧函数调用的返回值也会转化成Client一侧的返回值,这就实现了远程过程调用RPC。不言而喻,Client/Server双方的这两个对象必须提供对同一个界面的实现,在这里就是ApplicationClientProtocolPB。

Client端
YARNRunner.submitJob() //这是处于顶层的应用层
ResourceMgrDelegate.submitApplication() //这是RM的代理
YarnClientImpl.submitApplication() //YARN框架的Client一侧
ApplicationClientProtocolPBClientImpl.submitApplication()//ApplicationClientProtocol界面
proxy.submitApplication() //ApplicationClientProtocolPB界面
Protocol内部实现的submitApplication() //在TCP/IP的基础上发送应用层的请求
Socket和TCP/IP //这是网络连接的最低层

 

Server端:
Server这一边就不同了。在Server这一边,结构的层次和函数调用的层次是相反的,结构上处于最底层的Socket和TCP/IP反倒处于函数调用栈的最高层,愈往下调用实质上就愈往结构上的高层走。这是因为TCP/IP报文最初到达的是底层,然后逐层往上递交的过程一般都是通过函数调用实现的,所以层层往下调用的过程反倒变成了层层往上递交的过程。

那么接下来就是通过tcp/ip调用服务端ApplicationClientProtocolPBServiceImpl.submitApplication()方法;

class ApplicationClientProtocolPBServiceImpl  ApplicationClientProtocolPB {

   ApplicationClientProtocol real;
  
   ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl) {
    this.real = impl;
  }
   SubmitApplicationResponseProto submitApplication(RpcController arg0,SubmitApplicationRequestProto proto)  ServiceException {
  SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);创建一个请求
   {
    SubmitApplicationResponse response = real.submitApplication(request); 
real为ClientRMService类对象 ,该对象在RM初始化时由createClientRMService() 方法创建
     ((SubmitApplicationResponsePBImpl)response).getProto();
  }  ServiceException(e);
  }  (IOException e) {
     ServiceException(e);
  }
}
}

接下来调用ClientRMService.submitApplication(request); 方法

 YarnException {
  ApplicationSubmissionContext submissionContext = request
      .getApplicationSubmissionContext();
  ApplicationId applicationId = submissionContext.getApplicationId();

   ApplicationSubmissionContext needs to be validated for safety - only
   those fields that are independent of the RM's configuration will be
   checked here,those that are dependent on RM configuration are validated
   in RMAppManager.

  String user = ;
   Safety
    user = UserGroupInformation.getCurrentUser().getShortUserName();获取用户
  }  (IOException ie) {
    LOG.warn("Unable to get the current user."throw RPCUtil.getRemoteException(ie);
  }

   Check whether app has already been put into rmContext, If it is,simply return the response
判断作业是否已经存在,如果是则直接返回实例
  if (rmContext.getRMApps().get(applicationId) != ) {
    LOG.info("This is an earlier submitted application: " + applicationId);
     SubmitApplicationResponse.newInstance();
  }
  如果没有设置队列,则使用默认队列
   if (submissionContext.getQueue() == ) { 
    submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
  }
  如果没有设置application名字,则使用默认的命名规则
  if (submissionContext.getApplicationName() == ) {
    submissionContext.setApplicationName(
        YarnConfiguration.DEFAULT_APPLICATION_NAME);
  }
  如果没有指定提交类型,则指定默认为yarn模式
  if (submissionContext.getApplicationType() == ) {
    submissionContext
      .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
  } elseif (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
      submissionContext.setApplicationType(submissionContext
        .getApplicationType().substring(0 call RMAppManager to submit application directly
    rmAppManager.submitApplication(submissionContext,System.currentTimeMillis(),user);提交作业到rmAppManager手中

    LOG.info("Application with id " + applicationId.getId() + 
        " submitted by user " + user);
    RMAuditLogger.logSuccess(user,applicationId);
  }  (YarnException e) {
    LOG.info("Exception in submitting application with id " +
        applicationId.getId(),e);
    RMAuditLogger.logFailure(user,e.getMessage(),1)"> e;
  }

  SubmitApplicationResponse response = recordFactory
      .newRecordInstance(SubmitApplicationResponse.);
   response;
}

从作业提交的角度看,一旦进入了 RM 节点上的RMAppManagers. ubmitApplication(),作业的提交就已完成。 至于这以后的处理,那是 RM的事了,作业提交的最终流程就是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication() -> ApplicationClientProtocolPBClientImpl.submitApplication() -> ApplicationClientProtocolPBServiceImpl.submitApplication() -> ClientRMService.submitApplication() -> RMAppManager.submitApplication() ]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。