hadoop2.7之作业提交详解上 hadoop2.7之作业提交详解下

根据wordcount进行分析:

import org.apache.hadoop.conf.Configuration;
 org.apache.hadoop.fs.Path;
 org.apache.hadoop.io.IntWritable;
 org.apache.hadoop.io.Text;
 org.apache.hadoop.mapreduce.Job;
 org.apache.hadoop.mapreduce.Mapper;
 org.apache.hadoop.mapreduce.Reducer;
 org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 java.io.IOException;


/**
 * @author: LUGH1
 * @date: 2019-4-8
 * @description:
 */
public class WordCount {
    static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://192.168.88.130:9000");
        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCount.);

        job.setMapperClass(WdMapper.);
        job.setReducerClass(WdReducer.);

        job.setMapOutputKeyClass(Text.);
        job.setMapOutputValueClass(IntWritable.);

        job.setOutputKeyClass(Text.);
        job.setOutputValueClass(IntWritable.);

        FileInputFormat.setInputPaths(job,new Path("/test/word.txt"));
        FileOutputFormat.setOutputPath(job,1)">new Path("/test/output"));

        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);


        System.out.println("good job");
    }
}

class WdMapper extends Mapper<Object,Text,IntWritable> {
    @Override
    protected void map(Object key,Text value,Context context)  value.toString();
        String[] split = line.split(" ");
        for(String word : split){
            context.write(new Text(word),new IntWritable(1));
        }
    }
}

class WdReducer extends Reducer<Text,IntWritable,1)">void reduce(Text key,Iterable<IntWritable> values,InterruptedException {
        int count = 0;
        (IntWritable i : values){
            count += i.get();
        }
        context.write(key,1)"> IntWritable(count));
    }
}

这上面是个简单wordcount的代码,这里就不一一说明了,我们首先看main方法:获取一个job对象,然后经过一系列的设置,最后调用waitForCompletion方法

public static void main(String[] args) throws IOException,InterruptedException {  
 //....省略具体代码.....
   boolean result = job.waitForCompletion(true);  //调用由Job类提供的方法waitForCompletion()提交作业
   System.exit(result?0:1);
}

  接下来我们看下一调用waitForCompletion方法的这个类Job(由于类的内容很多,这里只展示我们需要的部分):

public class Job extends JobContextImpl implements JobContext {                                                                                                                                                                                                                                                                                                                                                                                                                                                                
  private static final Log LOG = LogFactory.getLog(Job.class);
  public static enum JobState {DEFINE,RUNNING}; //定义两种状态
  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;  //表示最多2000毫秒刷新状态
  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
  public static final String COMPLETION_POLL_INTERVAL_KEY = "mapreduce.client.completion.pollinterval";
  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY ="mapreduce.client.progressmonitor.pollinterval";
  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
  public static final String USED_GENERIC_PARSER = "mapreduce.client.genericoptionsparser.used";
  public static final String SUBMIT_REPLICATION =  "mapreduce.client.submit.file.replication";
  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
  public static enum TaskStatusFilter { NONE,KILLED,FAILED,SUCCEEDED,ALL }
  static {
    ConfigUtil.loadResources();  //加载配置
  }
  private JobState state = JobState.DEFINE;  //加载类的时候默认设置状态为DEFINE状态
  private JobStatus status;
  private long statustime;
  private Cluster cluster;
  private ReservationId reservationId;    

 boolean waitForCompletion(booleanverbose) 
submit() setUseNewAPI() connect() getJobSubmitter(FileSystemfs,ClientProtocolsubmitClient) isUber() //是否“拼车”模式(MapTask与ReduceTask在同一节点上) setPartitionerClass()//Mapper的输出可能要由Partitioner按某种规则分发给多个Reducer setMapSpeculativeExecution() //是否需要有Speculative的Mapper起预备队的作用 setReduceSpeculativeExecution() //是否需要有Speculative的Reducer起预备队的作用 setCacheFiles()
}

  在Job类中有很多的静态变量,代码块等,我们知道在java中初始化会先加载静态的这些变量和代码块,所以我们在main方法中调用Job job = Job.getInstance(conf);方法的时候,就会对这些静态的变量和代码进行加载,这些静态的变量和代码块就是设置一些参数,比如设置job的默认状态的DEFINE状态,以及加载一些配置文件,加载配置文件的方法如下:

public static void loadResources() {
    addDeprecatedKeys();
    Configuration.addDefaultResource("mapred-default.xml");
    Configuration.addDefaultResource("mapred-site.xml");
    Configuration.addDefaultResource("yarn-default.xml");
    Configuration.addDefaultResource("yarn-site.xml");
  }

 记载配置文件就是加载hadoop的一些配置文件,所以在我们调用waitForCompletion方法之前这些都是已经加载好了的,接下来我们看waitForCompletion方法:

//org.apache.hadoop.mapreduce中的Job类
public boolean waitForCompletion(boolean verbose) throws IOException,InterruptedException,ClassNotFoundException {
if (state == JobState.DEFINE) {   //判断作业是否是DEFINE状态,防止重复提交作业
    submit();  //提交作业 
}  
if (verbose) { //提交之后监控其运行,直到作业结束
  monitorAndPrintJob();   //周期性报告作业进度情况
 } else {   //要不然就周期行询问作业是否文成
    // get the completion poll interval from the client.
    int completionPollIntervalMillis =  Job.getCompletionPollInterval(cluster.getConf());
    while (!isComplete()) {
      try {
       Thread.sleep(completionPollIntervalMillis); 
      } catch (InterruptedException ie) {
      }
    }
 }
  return isSuccessful();
}

  

  从作业提交流程的角度看,这个方法的代码再简单不过了,实际就是对Job.submit()的调用,只是在调用之前要检查一下本作业是否处于 DEFINE 状态,以确保一个作业不会被提交多次。 如上所述,JobState的值只有 DEFINE 和 RUNNING 两种,具体Job对象创建之初在构造函数Job()中将其设置成 DEFINE,作业提交成功之后就将其改成 RUNNING,这就把门关上了。
  在正常的情况下,Job.submit() 很快就会返回,因为这个方法的作用只是把作业提交上去,而无须等待作业的执行和完成。 但是,在Job.submit()返回之后,Job.waitForCompletion()则要等待作业执行完成了以后才会返回。 在等待期间,如果参数verbose为true,就要周期地报告作业执行的进展,或者就只是周期地检测作业是否已经完成。

所以我们的作业提交流程目前是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit() ]

那么,接下来,看一看这个submit方法:

void submit() 
         final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(),cluster.getClient());//获取JobSubmitter的实例对象submitter 
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { //ugi.doAs用来控制权限 public JobStatus run() return submitter.submitJobInternal(Job.this,cluster); //真正用于提交作业 } }); state = JobState.RUNNING; //设置job的状态为RUNNING LOG.info("The url to track the job: " + getTrackingURL()); }

接下来我们先看connect方法:

private synchronized  connect()
          if (cluster == null) { //如果cluter为空,我们就创建一个cluster实例
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          return  Cluster(getConfiguration()); //创建cluster
                   }
                 });
    }
  }

可见connect()的作用就是保证节点上有个Cluster类对象,如果还没有,就创建一个。 那我们就看一下Cluster这个类(列出一部分):

 Cluster {
  @InterfaceStability.Evolving  enum JobTrackerStatus {INITIALIZING,RUNNING}; //作业跟踪状态
  private ClientProtocolProvider clientProtocolProvider; 集群版为YarnClientProtocolProvider ,本地模式为LocalClientProtocolProvider
  private ClientProtocol client;  在集群条件下,这是与外界通信的渠道和规则
  private UserGroupInformation ugi; 用来控制权限
  private Configuration conf;  配置信息
  private FileSystem fs = null; 文件系统
  private Path sysDir = 系统目录
  private Path stagingAreaDir = ; 
  private Path jobHistoryDir = 历史作业目录
  final Log LOG = LogFactory.getLog(Cluster.);
ServiceLoader<ClientProtocolProvider>,就是针对
ClientProtocolProvider类的ServiceLoader,而且这就是通过ServiceLoaderl.oad()装载的ServiceLoader实现了Iterable界面,
//提供一个iterator()函数,因而可以用在for循环中。
它还提供了一个load()方法,可以通过ClassLoader加载Class static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.); static { ConfigUtil.loadResources(); 加载配置文件 } 构造器 public Cluster(Configuration conf) IOException { this( Cluster(InetSocketAddress jobTrackAddr,Configuration conf) this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr,conf); 调用initialize方法 } 目的是要创建ClientProtocolProvider和ClientProtocol initialize(InetSocketAddress jobTrackAddr,Configuration conf) synchronized (frameworkLoader) { 不允许多个线程同时进入此段代码,需要加锁 for (ClientProtocolProvider provider : frameworkLoader) { 遍历frameworkLoader获取provider LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = ; try { if (jobTrackAddr == null) { 通过ClientProtocolProvider的create方法创建clientProtocol clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr,conf); } if (clientProtocol != ) { clientProtocolProvider = provider; client = clientProtocol; 已经创建了ClientProtocol对象,YARNRunner或LocalJobRunner LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; 成功后结束循环 } else { 失败,记录日志 LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: "if (null == clientProtocolProvider || null == client) { 判断是否创建了ClientProtocolProvider和ClientProtocol对象 throw IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }

  那么知道job类的connect方法就是确保有实例cluster,如果没有就通过Cluster的构造函数进行创建,在创建之前需要加载一些配置信息ConfigUtil.loadResources()和对静态的变量frameworkLoader等赋值,然后在调用Cluster的构造方法,在Cluster的构造方法中必定调用Cluster.initialize()方法,其中ClientProtocolProvider和ClientProtocol:用户向RM节点提交作业,是要RM为其安排运行,所以RM起着服务提供者的作用,而用户则处于客户的位置。既然如此,双方就得有个协议,对于双方怎么交互,乃至服务怎么提供,都得有个规定。在Hadoop的代码中,这所谓Protocol甚至被“上纲上线”到了计算框架的高度,连是否采用YARN框架也被纳入了这个范畴。实际上ClientProtocol就起着这样的作用,而ClientProtocolProvider顾名思义是ClientProtocol的提供者,起着有点像是Factory的作用。

至于ServiceLoader<ClientProtocolProvider>,那是用来装载ClientProtocolProvider的。

我们首先看一下这个类ClientProtocolProvider,很明显是一个抽象类,这意味着只有继承和扩充了这个抽象类的具体类才能被实体化成对象

abstract  ClientProtocolProvider {
  
  abstract ClientProtocol create(Configuration conf)  IOException;
  
  abstract ClientProtocol create(InetSocketAddress addr,Configuration conf)  IOException;

  void close(ClientProtocol clientProtocol)  IOException;

}

接下来我们看看这个抽象类的两个子类YarnClientProtocolProvider和LocalClientProtocolProvider 

package org.apache.hadoop.mapred;
class YarnClientProtocolProvider extends ClientProtocolProvider {
  @Override
  public ClientProtocol create(Configuration conf)  IOException {
    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
     new YARNRunner(conf); YARNRunner实现了ClientProtocol接口
    }
    ;
  }
  @Override
return create(conf); } @Override if (clientProtocol instanceof YARNRunner) { ((YARNRunner)clientProtocol).close(); } }
class LocalClientProtocolProvider  IOException {
    String framework =
        conf.get(MRConfig.FRAMEWORK_NAME,MRConfig.LOCAL_FRAMEWORK_NAME);
    if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
      ;
    }
    conf.setInt(JobContext.NUM_MAPS,1); map数为1
    new LocalJobRunner(conf); LocalJobRunner实现了ClientProtocol接口
  }
  @Override
   LocalJobRunner doesn't use a socket
 close(ClientProtocol clientProtocol) {
     no clean up required
  }

现在返回来在聊聊Cluster.initialize()方法:

  其中ServiceLoader实现了Iterable界面,提供一个iterator()函数,因而可以用在for循环中。它还提供了一个load()方法,可以通过ClassLoader加载Class。此外,它还提供解析文件内容的功能装载了作为ServiceLoader对象的frameworkLoader,其LinkedHashMap中就有了上述的两个路径,这样就可以通过其iterator()函数依次引用这两个路径了

  然后,在Cluster类的构造函数中就会调用其initialize(),目的是要创建ClientProtocolProvider和ClientProtocol。

  但是ClientProtocolProvider是个抽象类,这意味着只有继承和扩充了这个抽象类的具体类才能被实体化成对象。Hadoop的源码中一共只有两个类扩充和落实了这个抽象类,那就是LocalClientProtocolProvider和YarnClientProtocolProvide

 

  可想而知,由这两种ClientProtocolProvider提供的ClientProtocol也是不一样的。事实上ClientProtocol是个界面,实现了这个界面的类也有两个,分别为LocalJobRunner和YARNRunner。但是实际使用的只能是其中之一。

  initialize的for循环,是基于前述ServiceLoader中iterator()的循环。实际上也就是对两个ClientProtocolProvider的循环,目的是要通过ClientProtocolProvider.create()创建用户所要求的ClientProtocol,也无非就是LocalJobRunner或YARNRunner。只要有一次创建成功,循环就没有必要继续了,因为只能有一种选择;但是,如果两次都失败,程序就无法继续了,因为不知道该怎样让RM提供计算服务。而能否成功创建,则取决于前述配置项的设置。不过ClientProtocolProvider是抽象类,实际上依次进行尝试的是LocalClientProtocolProvider和YarnClientProtocolProvider。假定第一轮循环时进行尝试的是前者,那么作业的流程就是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> LocalClientProtocolProvider.create()]

如果是后者,则作业的流程就是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create()]

这里我们假定以yarn方式提交,所以流程为第二种。

通过YarnClientProtocolProvider.create()方法,最终返回的是一个new YARNRunner(conf)对象。

  好了,继续回到我们的Job.submit()方法,到这里connect方法就算执行完毕了,接下就是对getJobSubmitter()的调用。 这个函数创建一个JobSubmitter类对象,然后Jobs. ubmit()就调用它的submitJobInternal()方法,完成作业的提交。创建JobSubmitter对象时的两个参数就是调用getJobSubmitter()时的两个参数,就是cluster.getFileSystem()和cluster.getClient()。 其中cluster.getClient()返回的就是 YARNRunner或LocalJobRunner;而cluster.getFileSystem()的返回结果对于 YARNRunner是 RM 节点上文件系统的 URL,对于 LocalJobRunner则是本节点上的一个相对路径为“mapred/system”的目录。

  接下来了解下JobSubmitter这个类(部分展示)

 

 org.apache.hadoop.mapreduce;
 JobSubmitter {
  final Log LOG = LogFactory.getLog(JobSubmitter.final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; //shuffle算法
  final int SHUFFLE_KEY_LENGTH = 64;
  private FileSystem jtFs;
   ClientProtocol submitClient;
   String submitHostName;
   String submitHostAddress;
  JobSubmitter(FileSystem submitFs,ClientProtocol submitClient) 
  this.submitClient = submitClient; 在集群条件下是YARNRunner 
    this.jtFs = submitFs;
  }

compareFs(FileSystemsrcFs,FileSystemdestFs) 比较两个文件系统是否相同
getPathURI()
checkSpecs()
copyRemoteFiles()
copyAndConfigureFiles()
copyJar(PathoriginalJarPath,PathsubmitJarFile,shortreplication)
addMRFrameworkToDistributedCache()
submitJobInternal(Jobjob,Clustercluster) 将作业提交给集群
writeNewSplits(JobContextjob,PathjobSubmitDir)
getJobSubmitter(FileSystem fs,ClientProtocol submitClient)//底层调用的就是JobSubmitter的构造方法
}

 

接下来看看submitJobInternal方法

JobStatus submitJobInternal(Job job,Cluster cluster) 
 ClassNotFoundException,IOException {

  validate the jobs output specs 验证输出格式等配置 
  checkSpecs(job);

  Configuration conf = job.getConfiguration(); 获取配置信息
  addMRFrameworkToDistributedCache(conf); 添加到缓存

  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,conf); 获取目录路径
  configure the command line options correctly on the submitting dfs
  InetAddress ip = InetAddress.getLocalHost(); 获取本节点(该主机)的ip地址
  if (ip != ) {
    submitHostAddress = ip.getHostAddress();本节点IP地址的字符串形式 
    submitHostName = ip.getHostName();本节点名称 
    conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); 写入配置conf中
    conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
  }
  JobID jobId = submitClient.getNewJobID(); 设置JOBId(作业ID唯一)
  job.setJobID(jobId); 设置job的id
  Path submitJobDir = new Path(jobStagingArea,jobId.toString());本作业的临时子目录名中包含着作业ID号码 
  JobStatus status =  {
    conf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName()); 这是用户名
    conf.set("hadoop.http.filter.initializers"准备用于Http接口的过滤器初始化 
    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR,submitJobDir.toString());设置提交job的路径
    LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
        + " as the submit dir");

     get delegation token for the dir  /* 准备好与访问权限有关的证件(token) */ 
    TokenCache.obtainTokensForNamenodes(job.getCredentials(),1)">new Path[] { submitJobDir },conf); 获取与NameNode打交道所需证件 
    
    populateTokenCache(conf,job.getCredentials());

     generate a secret to authenticate shuffle transfers需要生成Mapper与Reducer之间的数据流动所用的密码 
    if (TokenCache.getShuffleSecretKey(job.getCredentials()) == ) {
      KeyGenerator keyGen;
       {
        keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
        keyGen.init(SHUFFLE_KEY_LENGTH);
      }  (NoSuchAlgorithmException e) {
        new IOException("Error generating shuffle secret key" keyGen.generateKey();
      TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());
    }
     (CryptoUtils.isEncryptedSpillEnabled(conf)) {
      conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,1);
      LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
              "data spill is enabled");
    }

    copyAndConfigureFiles(job,submitJobDir);将可执行文件之类拷贝到HDFS中,默认的是保留10份,会存在不同的节点上

    Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);配置文件路径 
    
     Create the splits for the job
    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
    int maps = writeSplits(job,submitJobDir);    设置map数,这里如何设置map的数量我会单独写一篇介绍,
    conf.setInt(MRJobConfig.NUM_MAPS,maps);
    LOG.info("number of splits:" + maps);

     write "queue admins of the queue to which job is being submitted"  to job file.
    String queue = conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME); 默认作业调度队列名为“default”

    AccessControlList acl = submitClient.getQueueAdmins(queue);
    conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()),acl.getAclString());  设置acl权限 

     removing jobtoken referrals before copying the jobconf to HDFS
     as the tasks don't need this setting,actually they may break
     because of it if present as the referral will point to a
     different job.
    TokenCache.cleanUpTokenReferral(conf); 清楚Token引用的缓存

     (conf.getBoolean(
        MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
       Add HDFS tracking ids 如果启用了跟踪机制的话
      ArrayList<String> trackingIds = new ArrayList<String>();
      for (Token<? extends TokenIdentifier> t :
          job.getCredentials().getAllTokens()) {
        trackingIds.add(t.decodeIdentifier().getTrackingId()); 获取所有相关跟踪机制
      }
      conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(new String[trackingIds.size()])); 设置跟踪机制
    }

     Set reservation info if it exists设置预设参数(如果有)
    ReservationId reservationId = job.getReservationId();
    if (reservationId != ) {
      conf.set(MRJobConfig.RESERVATION_ID,reservationId.toString());
    }

     Write job file to submit dir
    writeConf(conf,submitJobFile);将conf的内容写入一个.xml文件 
    
    //
     Now,actually submit the job (using the submit name)
    //
    printTokens(jobId,job.getCredentials());

提交作业,通过YarnRunner.submitJob()或LocalJobRunner.submitJob() 
    status = submitClient.submitJob(
        jobId,submitJobDir.toString(),job.getCredentials());
    if (status != ) {
      return status;  返回状态
    }  {
      new IOException("Could not launch job");
    }
  } finally {
    if (status == ) {
      LOG.info("Cleaning up the staging area " + submitJobDir);
      if (jtFs != null && submitJobDir != )
        jtFs.delete(submitJobDir,1)">true);  删除临时目录 

    }
  }
}

submitJobInternal方法可以得知,需要随同作业单一起提交的资源和信息有两类:

  一类是需要交到资源管理器RM手里,供RM在立项和调度时使用的;

  一类则并非供RM直接使用,而是供具体进行计算的节点使用的。前者包括本节点即作业提交者的IP地址、节点名、用户名、作业ID号,以及有关MapReduce计算输入数据文件的信息,还有为提交作业而提供的“证章(Token)”等。这些信息将被打包提交给RM,这就是狭义的作业提交,是流程的主体。后者则有作业执行所需的jar可执行文件、外来对象库等。如果计算的输入文件在本地,则后者还应包括输入文件。这些资源并不需要提交给RM,因为RM本身并不需要用到这些资源,但是必须要把这些资源复制或转移到全局性的HDFS文件系统中,让具体承担计算任务的节点能够取用。

  为了上传相关的资源和信息,需要在HDFS文件系统中为本作业创建一个目录。HDFS文件系统中有一个目录是专门用于作业提交的,称为“舞台目录(stagingdirectory)”。所以这里要通过JobSubmissionFiles.getStagingDir()从集群获取这个目录的路径。然后就以本作业的ID,即JobId为目录名在这个舞台目录中创建一个临时的子目录,这就是代码中的submitJobDir。以后凡是与本作业有关的资源和信息,就都上传到这个子目录中。

  这个方法还包括设置map数,执行队列呀等最后执行connect()方法中创建的对象YARNRunner(或者是LocalJobRunner)的submitJob方法。这样我们的作业就提交给RM了,作业流程如下:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]

可继续看(hadoop2.7之作业提交详解(下)

 

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。