阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop JobTracker提交job源码浅析

181次阅读
没有评论

共计 9338 个字符,预计需要花费 24 分钟才能阅读完成。

上一篇文章说到 jobClient 提交 job 的过程,这篇文章是接着上一篇文章继续写的 http://www.linuxidc.com/Linux/2013-12/93700.htm

上一篇说到 jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()) 这里,这里就是 jobTracker 进行 job 的提交过程,还有一个 JobSubmissionProtocol 的实现是 LocalJobRunner,这是本地执行的时候使用的,真正集群运行 Job 还是使用的 jobTracker,所以只看 jobTracker 类的 submitJob。

1.jobTracker.submitJob():第一句就是 checkJobTrackerState() 这个是检查 jobTracker 状态,是否运行中,这里说一句,jobTracker 是在 Hadoop 集群启动的时候启动的,也就是在执行 start-all 或者 start-mapred 的时候启动,启动的时候会调用 JobTracker 的 main 方法,然后在 jps 的时候就可以看见一个 jobTracker 的进程了。下面来看一下 JobTracker.main() 方法。

2.JobTracker.main():第一句是 JobTracker tracker = startTracker(new JobConf()),这是实例化一个 jobTracke 实例。

3.JobTracker.startTracker():result = new JobTracker(conf, identifier),实例化一个 jobTracker 对象,在实例化的时候会做很多事,所以还是进去瞅瞅。

4.JobTracker.JobTracker():实例化的时候会初始化很多参数,记也记不住,主要看下实例化 taskScheduler 的内容:Class<? extends TaskScheduler> schedulerClass

      = conf.getClass(“mapred.jobtracker.taskScheduler”,JobQueueTaskScheduler.class, TaskScheduler.class);taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf),这两句就是根据配置文件设置的 taskScheduler 类名,通过反射获得对应的 taskScheduler 对象,在实例化的时候虽然不同的 TaskScheduler 具体操作不一样,但是统一的都会初始化一个 JobListener 对象,这个对象就是后面将要监听 job 的 listener。剩下的内容就不说了。回到 JobTracker.startTracker() 方法。

5.JobTracker.JobTracker():在实例化 jobTracker 之后,会执行 result.taskScheduler.setTaskTrackerManager(result),这个就是将 jobTracker 对象设置给 taskScheduler。后面就什么了,现在可以回到 main 方法了

public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize)
  throws IOException, InterruptedException {
    DefaultMetricsSystem.initialize(“JobTracker”);
    JobTracker result = null;
    while (true) {
      try {
        result = new JobTracker(conf, identifier);
        result.taskScheduler.setTaskTrackerManager(result);
        break;
      } catch (VersionMismatch e) {
        throw e;
      } catch (BindException e) {
        throw e;
      } catch (UnknownHostException e) {
        throw e;
      } catch (AccessControlException ace) {
        // in case of jobtracker not having right access
        // bail out
        throw ace;
      } catch (IOException e) {
        LOG.warn(“Error starting tracker: ” +
                StringUtils.stringifyException(e));
      }
      Thread.sleep(1000);
    }
    if (result != null) {
      JobEndNotifier.startNotifier();
      MBeans.register(“JobTracker”, “JobTrackerInfo”, result);
      if(initialize == true) {
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
        result.initializeFilesystem();
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
        result.initialize();
      }
    }
    return result;
  }

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93701p2.htm

相关阅读

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

6.JobTracker.main():在实例化 jobTracker 之后,会调用 tracker.offerService() 方法,之后 main 方法就没什么了,下面看看 tracker.offerService() 这个方法。

public static void main(String argv[]
                          ) throws IOException, InterruptedException {
    StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
   
    try {
      if(argv.length == 0) {
        JobTracker tracker = startTracker(new JobConf());
        tracker.offerService();
      }
      else {
        if (“-dumpConfiguration”.equals(argv[0]) && argv.length == 1) {
          dumpConfiguration(new PrintWriter(System.out));
        }
        else {
          System.out.println(“usage: JobTracker [-dumpConfiguration]”);
          System.exit(-1);
        }
      }
    } catch (Throwable e) {
      LOG.fatal(StringUtils.stringifyException(e));
      System.exit(-1);
    }
  }

7.JobTracker.offerService():这个方法中有一些其他东西,略掉,只看 taskScheduler.start() 这个方法,因为这里只是想分析下 JobTracker 提交 job 的过程,所以省去很多复杂的东西。

 

8.taskScheduler.start():这个方法就是启动 TaskScheduler,这个方法不同 taskScheduler 也不同,但是统一的还是会有一个 taskTrackerManager.addJobInProgressListener(jobListener) 这个操作,taskTrackerManager 就是 jobTracker(第 5 步),这句的意思是为 jobTracker 添加 jobListener,用来监听 job 的。这句的内部就是调用 jobTracker 的 jobInProgressListeners 集合的 add(listener) 方法。

到这里可以说看完了整个 JobTracker 的启动过程,虽然很浅显,但是对于后面将要分析的内容,这些就够了。下面来看看 job 的提交过程,也就是 jobTracker 的 submit() 方法。

1.jobTracker.submit():第一步是 checkSafeMode(),检查是否在安全模式,在安全模式则抛出异常。然后执行 jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),new Path(jobSubmitDir),生成一个 jobInfo 对象,jobInfo 主要保存 job 的 id,user,jobSubmitDir(也就是 job 的任务目录,上一篇文章提到)。接着是判断 job 是否可被 recovered(job 失败的时候尝试再次执行),如果允许的话 (默认允许),则将 jobInfo 对象序列化到 job-info 文件中。接着到达最关键的地方,job = new JobInProgress(this, this.conf, jobInfo, 0, ts),为 job 实例化一个 JobInProgress 对象,这个对象将会对 job 以后的所有情况进行负责,如初始化,执行等。下面看看 JobInProgress 对象的初始化操作。

2.JobInProgress:这里看下将 job.xml 下载到本地的操作。然后就是 job 的队列信息,默认的队列名是 default,Queue queue = this.jobtracker.getQueueManager().getQueue(queueName),这个主要是根据 Hadoop 所使用的 taskScheduler 有关,具体不研究。剩下的是一些参数的初始化,如 map 的数目,reduce 的数目等。这里还有个设置 job 的优先级的,默认是 normal。this.priority = conf.getJobPriority();this.status.setJobPriority(this.priority); 还有检查 taskLimit 的操作,就是检查 map+reduce 的任务数是否超出 mapred.jobtracker.maxtasks.per.job 设置的值,默认是 -1,就是没有限制的意思。回到 jobTracker.submit() 方法

this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
          +”/”+jobId + “.xml”);
      Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
      jobFile = jobFilePath.toString();
      fs.copyToLocalFile(jobFilePath, localJobFile);
      conf = new JobConf(localJobFile);

上一篇文章说到 jobClient 提交 job 的过程,这篇文章是接着上一篇文章继续写的 http://www.linuxidc.com/Linux/2013-12/93700.htm

上一篇说到 jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()) 这里,这里就是 jobTracker 进行 job 的提交过程,还有一个 JobSubmissionProtocol 的实现是 LocalJobRunner,这是本地执行的时候使用的,真正集群运行 Job 还是使用的 jobTracker,所以只看 jobTracker 类的 submitJob。

1.jobTracker.submitJob():第一句就是 checkJobTrackerState() 这个是检查 jobTracker 状态,是否运行中,这里说一句,jobTracker 是在 Hadoop 集群启动的时候启动的,也就是在执行 start-all 或者 start-mapred 的时候启动,启动的时候会调用 JobTracker 的 main 方法,然后在 jps 的时候就可以看见一个 jobTracker 的进程了。下面来看一下 JobTracker.main() 方法。

2.JobTracker.main():第一句是 JobTracker tracker = startTracker(new JobConf()),这是实例化一个 jobTracke 实例。

3.JobTracker.startTracker():result = new JobTracker(conf, identifier),实例化一个 jobTracker 对象,在实例化的时候会做很多事,所以还是进去瞅瞅。

4.JobTracker.JobTracker():实例化的时候会初始化很多参数,记也记不住,主要看下实例化 taskScheduler 的内容:Class<? extends TaskScheduler> schedulerClass

      = conf.getClass(“mapred.jobtracker.taskScheduler”,JobQueueTaskScheduler.class, TaskScheduler.class);taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf),这两句就是根据配置文件设置的 taskScheduler 类名,通过反射获得对应的 taskScheduler 对象,在实例化的时候虽然不同的 TaskScheduler 具体操作不一样,但是统一的都会初始化一个 JobListener 对象,这个对象就是后面将要监听 job 的 listener。剩下的内容就不说了。回到 JobTracker.startTracker() 方法。

5.JobTracker.JobTracker():在实例化 jobTracker 之后,会执行 result.taskScheduler.setTaskTrackerManager(result),这个就是将 jobTracker 对象设置给 taskScheduler。后面就什么了,现在可以回到 main 方法了

public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize)
  throws IOException, InterruptedException {
    DefaultMetricsSystem.initialize(“JobTracker”);
    JobTracker result = null;
    while (true) {
      try {
        result = new JobTracker(conf, identifier);
        result.taskScheduler.setTaskTrackerManager(result);
        break;
      } catch (VersionMismatch e) {
        throw e;
      } catch (BindException e) {
        throw e;
      } catch (UnknownHostException e) {
        throw e;
      } catch (AccessControlException ace) {
        // in case of jobtracker not having right access
        // bail out
        throw ace;
      } catch (IOException e) {
        LOG.warn(“Error starting tracker: ” +
                StringUtils.stringifyException(e));
      }
      Thread.sleep(1000);
    }
    if (result != null) {
      JobEndNotifier.startNotifier();
      MBeans.register(“JobTracker”, “JobTrackerInfo”, result);
      if(initialize == true) {
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
        result.initializeFilesystem();
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
        result.initialize();
      }
    }
    return result;
  }

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93701p2.htm

相关阅读

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

3.jobTracker.submit():实例化 JobInProgress 之后,会根据 jobProfile 获取 job 的队列信息,并判断相应的队列是否在运行中,不在则任务失败。然后检查内存情况 checkMemoryRequirements(job),再调用 taskScheduler 的 taskScheduler.checkJobSubmission(job) 检查任务提交情况(具体是啥玩意,不太情况)。接下来就是执行 status = addJob(jobId, job),为 Job 设置 listener。

 

4.jobTracker.addJob():前面说过,在初始化 jobTracker 的时候会实例化 taskScheduler,然后调用 taskScheduler 的 start() 方法,为 jobTracker 添加 JobListener 对象,所以这里的 JobInProgressListener 对象就是相应的 taskScheduler 的 JobListener,这里为 job 添加了 JobListener。

private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
  throws IOException {
    totalSubmissions++;

    synchronized (jobs) {
      synchronized (taskScheduler) {
        jobs.put(job.getProfile().getJobID(), job);
        for (JobInProgressListener listener : jobInProgressListeners) {
          listener.jobAdded(job);
        }
      }
    }
    myInstrumentation.submitJob(job.getJobConf(), jobId);
    job.getQueueMetrics().submitJob(job.getJobConf(), jobId);

    LOG.info(“Job ” + jobId + ” added successfully for user ‘”
            + job.getJobConf().getUser() + “‘ to queue ‘”
            + job.getJobConf().getQueueName() + “‘”);
    AuditLogger.logSuccess(job.getUser(),
        Operation.SUBMIT_JOB.name(), jobId.toString());
    return job.getStatus();
  }

到这里整个 JobTracker 的 job 提交过程就结束了,中间很多东西没有深入去研究,只是浅显的了解了下,如有错误,请指出,谢谢

更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计9338字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中