阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

FairScheduler job初始化过程源码浅析

258次阅读
没有评论

共计 16259 个字符,预计需要花费 41 分钟才能阅读完成。

上一篇文章(http://www.linuxidc.com/Linux/2013-12/93701.htm)说到了 jobTracker 中的 submitJob()方法,这个方法最终会调用 listener.jobAdded(job),将 Job 注册到 TaskScheduler 中,由其进行调度。今天接着研究。Hadoop 中默认的 TaskScheduler 是 JobQueueTaskScheduler,采用的是 FIFO(先进先出)原则进行调度,还有 FiarScheduler 和 CapacityTaskScheduler 两种调度类(非 hadoop 自带,不过 hadoop 也把他们加入到类库中),这两个类可以在 hadoop 目录下的 lib 包下找到,源码在 src/contrib 下可以找到。主要对 FairScheduler 进行解读。

上文提到 jobTracker 最终将 job 注册到 jobListener 中,下面就来看看 FairScheduler 的 JobListener。

1.FairScheduler.JobListener.addJob():这个方法比较简单,JobSchedulable mapSched = ReflectionUtils.newInstance(conf.getClass(“mapred.jobtracker.jobSchedulable”, JobSchedulable.class, JobSchedulable.class), conf)这里通过反射获得两个 JobSchedulable 对象,也就是默认的 FairScheduler.JobSchedulable 对象,一个是 mapSched,一个是 redSched,然后进行 JobSchedulable 的初始化,比较简单。infos.put(job, info)将 job 添加到 infos(存放所有的 jobInPorgress 对象)中,同时将 job 添加到 PoolScheduable 中,主要是根据配置的 poolName 获取对应的 pool。下面的是重点,update()方法,下面看看这个方法。

public void jobAdded(JobInProgress job) {
      synchronized (FairScheduler.this) {
        eventLog.log(“JOB_ADDED”, job.getJobID());
        JobSchedulable mapSched = ReflectionUtils.newInstance(
            conf.getClass(“mapred.jobtracker.jobSchedulable”, JobSchedulable.class,
                JobSchedulable.class), conf);
        mapSched.init(FairScheduler.this, job, TaskType.MAP);

        JobSchedulable redSched = ReflectionUtils.newInstance(
            conf.getClass(“mapred.jobtracker.jobSchedulable”, JobSchedulable.class,
                JobSchedulable.class), conf);
        redSched.init(FairScheduler.this, job, TaskType.REDUCE);

        JobInfo info = new JobInfo(mapSched, redSched);
        infos.put(job, info);
        poolMgr.addJob(job); // Also adds job into the right PoolScheduable
        update();
      }
    }

2.FairScheduler.update():跳过看不懂的,直接看 poolMgr.reloadAllocsIfNecessary(),这个方法主要是读取 FairScheduler 的配置文件(fair-scheduler.xml),由 mapred.fairscheduler.allocation.file 参数设置,这里是根据配置文件的最后修改时间 +ALLOC_RELOAD_INTERVAL 决定是否重新加载配置文件,加载文件的时候就是简单地读取 xml 文件。接着看 update 方法,加载完配置文件之后会遍历 infos(保存了 FairScheduler 所有的 jobInProgress),遍历的时候去除成功了的 job 和失败了的 job 以及被 kill 掉的 job,同时也会从 pool 中去掉该 job。接下来就是 updateRunnability(),这个方法会根据 userMaxJob 以及 poolMaxJob 数量进行判断是否启动 job。

 List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
      for (JobInProgress job: infos.keySet()) {
        int runState = job.getStatus().getRunState();
        if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
          || runState == JobStatus.KILLED) {
            toRemove.add(job);
        }
      }
      for (JobInProgress job: toRemove) {
        jobNoLongerRunning(job);
      }

3.FairScheduler.updateRunnability():第一步将所有 infos 中剩余的 job(成功以及失败的任务会在 update 时清除)状态全部设为 notrunning。接着对 infos 中的 job 进行排序,Collections.sort(jobs, new FifoJobComparator()),排序规则是 FIFO 原则(奇怪,不懂)。然后接着对 jobs 进行遍历,同时根据该 job 的提交用户和提交的 pool 的最大提交 job 数量决定是否将其添加到任务队列中(就是两个 list),如果该 job 状态 =RUNNING,则 jobinfo.running=true,如果 job 状态 =PREP(准备中),则对其进行初始化(注意这里只对 job 状态 =RUNNING 和 PREP 的 job 进行操作)。jobInitializer.initJob(jobInfo, job)进行 job 初始化,这里使用到 jdk 的 threadPool(其实就是将 thread 加入到线程池中,由线程池绝对什么时候对其进行执行,总之都会调用 thread 的 run 方法),看看 thread 的 run 方法。run 方法中调用 ttm.initJob(job),此处的 ttm 就是 jobTracker,现在回到 jobTracker 去。

 if (userCount < poolMgr.getUserMaxJobs(user) &&
          poolCount < poolMgr.getPoolMaxJobs(pool)) {
        if (job.getStatus().getRunState() == JobStatus.RUNNING ||
            job.getStatus().getRunState() == JobStatus.PREP) {
          userJobs.put(user, userCount + 1);
          poolJobs.put(pool, poolCount + 1);
          JobInfo jobInfo = infos.get(job);
          if (job.getStatus().getRunState() == JobStatus.RUNNING) {
            jobInfo.runnable = true;
          } else {
            // The job is in the PREP state. Give it to the job initializer
            // for initialization if we have not already done it.
            if (jobInfo.needsInitializing) {
              jobInfo.needsInitializing = false;
              jobInitializer.initJob(jobInfo, job);
            }
          }
        }
      }

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93702p2.htm

相关阅读

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

4.JobTracker.initJob():主要调用 job.initTasks(),下面进入到 JobInProgress.initTasks()。

 

5.JobInProgress.initTasks():为 job 对象设置优先级 setPriority(this.priority),接着读取分片信息文件获取分片信息,SplitMetaInfoReader.readSplitMetaInfo()这个方就是 jobInPorgress 用来读取分分片信息的,读取过程与写入过程相对应,具体还是较简单的。读取了分片信息之后,根据分片数量创建相应数量的 mapTask(TaskInProgress 对象),接下来会执行 nonRunningMapCache = createCache(splits, maxLevel),这个方法是根据每个分片的 location 信息,然后根据 location 的 host 判断每个 host 上所有的 job,并放入 cache 中。接着根据设置的 reduce 数量新建对应的 reduceTask(TaskInProgress 对象),并加入到 nonRunningReduces 队列中,并根据 mapred.reduce.slowstart.completed.maps(百分比,默认是 5%)参数的值计算 completedMapsForReduceSlowstart(多少 map 任务完成的时候启动 reduce 任务)。之后就是分别新建两个 setUp 任务和 cheanUp 任务,分别对应 map 和 reduce task。到此 initTask 完成,initTask 完成 JobTracker 的 initJob 也就差不多完成了,接着 FairScheduler 的 updateRunnability()也就完成了。回到 FairScheduler.update()。

6.FairScheduler.update():

for (Pool pool: poolMgr.getPools()) {
        pool.getMapSchedulable().updateDemand();
        pool.getReduceSchedulable().updateDemand();
      }
     
      // Compute fair shares based on updated demands
      List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
      List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
      SchedulingAlgorithms.computeFairShares(
          mapScheds, clusterStatus.getMaxMapTasks());
      SchedulingAlgorithms.computeFairShares(
          reduceScheds, clusterStatus.getMaxReduceTasks());
     
      // Use the computed shares to assign shares within each pool
      for (Pool pool: poolMgr.getPools()) {
        pool.getMapSchedulable().redistributeShare();
        pool.getReduceSchedulable().redistributeShare();
      }
     
      if (preemptionEnabled)
        updatePreemptionVariables();
    }

看不懂,先到这吧,等下次慢慢研究吧。

上一篇文章(http://www.linuxidc.com/Linux/2013-12/93701.htm)说到了 jobTracker 中的 submitJob()方法,这个方法最终会调用 listener.jobAdded(job),将 Job 注册到 TaskScheduler 中,由其进行调度。今天接着研究。Hadoop 中默认的 TaskScheduler 是 JobQueueTaskScheduler,采用的是 FIFO(先进先出)原则进行调度,还有 FiarScheduler 和 CapacityTaskScheduler 两种调度类(非 hadoop 自带,不过 hadoop 也把他们加入到类库中),这两个类可以在 hadoop 目录下的 lib 包下找到,源码在 src/contrib 下可以找到。主要对 FairScheduler 进行解读。

上文提到 jobTracker 最终将 job 注册到 jobListener 中,下面就来看看 FairScheduler 的 JobListener。

1.FairScheduler.JobListener.addJob():这个方法比较简单,JobSchedulable mapSched = ReflectionUtils.newInstance(conf.getClass(“mapred.jobtracker.jobSchedulable”, JobSchedulable.class, JobSchedulable.class), conf)这里通过反射获得两个 JobSchedulable 对象,也就是默认的 FairScheduler.JobSchedulable 对象,一个是 mapSched,一个是 redSched,然后进行 JobSchedulable 的初始化,比较简单。infos.put(job, info)将 job 添加到 infos(存放所有的 jobInPorgress 对象)中,同时将 job 添加到 PoolScheduable 中,主要是根据配置的 poolName 获取对应的 pool。下面的是重点,update()方法,下面看看这个方法。

public void jobAdded(JobInProgress job) {
      synchronized (FairScheduler.this) {
        eventLog.log(“JOB_ADDED”, job.getJobID());
        JobSchedulable mapSched = ReflectionUtils.newInstance(
            conf.getClass(“mapred.jobtracker.jobSchedulable”, JobSchedulable.class,
                JobSchedulable.class), conf);
        mapSched.init(FairScheduler.this, job, TaskType.MAP);

        JobSchedulable redSched = ReflectionUtils.newInstance(
            conf.getClass(“mapred.jobtracker.jobSchedulable”, JobSchedulable.class,
                JobSchedulable.class), conf);
        redSched.init(FairScheduler.this, job, TaskType.REDUCE);

        JobInfo info = new JobInfo(mapSched, redSched);
        infos.put(job, info);
        poolMgr.addJob(job); // Also adds job into the right PoolScheduable
        update();
      }
    }

2.FairScheduler.update():跳过看不懂的,直接看 poolMgr.reloadAllocsIfNecessary(),这个方法主要是读取 FairScheduler 的配置文件(fair-scheduler.xml),由 mapred.fairscheduler.allocation.file 参数设置,这里是根据配置文件的最后修改时间 +ALLOC_RELOAD_INTERVAL 决定是否重新加载配置文件,加载文件的时候就是简单地读取 xml 文件。接着看 update 方法,加载完配置文件之后会遍历 infos(保存了 FairScheduler 所有的 jobInProgress),遍历的时候去除成功了的 job 和失败了的 job 以及被 kill 掉的 job,同时也会从 pool 中去掉该 job。接下来就是 updateRunnability(),这个方法会根据 userMaxJob 以及 poolMaxJob 数量进行判断是否启动 job。

 List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
      for (JobInProgress job: infos.keySet()) {
        int runState = job.getStatus().getRunState();
        if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
          || runState == JobStatus.KILLED) {
            toRemove.add(job);
        }
      }
      for (JobInProgress job: toRemove) {
        jobNoLongerRunning(job);
      }

3.FairScheduler.updateRunnability():第一步将所有 infos 中剩余的 job(成功以及失败的任务会在 update 时清除)状态全部设为 notrunning。接着对 infos 中的 job 进行排序,Collections.sort(jobs, new FifoJobComparator()),排序规则是 FIFO 原则(奇怪,不懂)。然后接着对 jobs 进行遍历,同时根据该 job 的提交用户和提交的 pool 的最大提交 job 数量决定是否将其添加到任务队列中(就是两个 list),如果该 job 状态 =RUNNING,则 jobinfo.running=true,如果 job 状态 =PREP(准备中),则对其进行初始化(注意这里只对 job 状态 =RUNNING 和 PREP 的 job 进行操作)。jobInitializer.initJob(jobInfo, job)进行 job 初始化,这里使用到 jdk 的 threadPool(其实就是将 thread 加入到线程池中,由线程池绝对什么时候对其进行执行,总之都会调用 thread 的 run 方法),看看 thread 的 run 方法。run 方法中调用 ttm.initJob(job),此处的 ttm 就是 jobTracker,现在回到 jobTracker 去。

 if (userCount < poolMgr.getUserMaxJobs(user) &&
          poolCount < poolMgr.getPoolMaxJobs(pool)) {
        if (job.getStatus().getRunState() == JobStatus.RUNNING ||
            job.getStatus().getRunState() == JobStatus.PREP) {
          userJobs.put(user, userCount + 1);
          poolJobs.put(pool, poolCount + 1);
          JobInfo jobInfo = infos.get(job);
          if (job.getStatus().getRunState() == JobStatus.RUNNING) {
            jobInfo.runnable = true;
          } else {
            // The job is in the PREP state. Give it to the job initializer
            // for initialization if we have not already done it.
            if (jobInfo.needsInitializing) {
              jobInfo.needsInitializing = false;
              jobInitializer.initJob(jobInfo, job);
            }
          }
        }
      }

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93702p2.htm

相关阅读

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

把上次遗留的问题继续研究一下。

for (Pool pool: poolMgr.getPools()) {
        pool.getMapSchedulable().updateDemand();
        pool.getReduceSchedulable().updateDemand();
      }

这里是更新每个 pool 的 slot 需求情况,下面来看看,pool.getMapSchedulable().updateDemand(),pool.getReduceSchedulable().updateDemand()两个基本相同。

 

7.PoolSchedulable.updateDemand():第一句 poolMgr.getMaxSlots(pool.getName(), taskType)是获取 pool 的最大 slot 数量,从配置文件获取,配置文件是之前加载过的,前面有说到。每个 PoolSchedulable 中都会存在多个 JobSchedulable 对象,在 JobListener.addJob()时添加。一个 JobSchedulable 对应一个 jobInProgress 对象。然后调用 JobSchedulable.updateDemand()更新每个 JobSchedulable 的 slot 的需求。

public void updateDemand() {
    // limit the demand to maxTasks
    int maxTasks = poolMgr.getMaxSlots(pool.getName(), taskType);
    demand = 0;
    for (JobSchedulable sched: jobScheds) {
      sched.updateDemand();
      demand += sched.getDemand();
      if (demand >= maxTasks) {
        demand = maxTasks;
        break;
      }
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug(“The pool ” + pool.getName() + ” demand is ” + demand
          + “; maxTasks is ” + maxTasks);
    }
  }

8.JobSchedulable.updateDemand():首先第一步就是判断该 JobSchedulable 的 job 是否已运行 (RUNNING),没有运行则不分配 slot。然后判断该 JobSchedulable 是 Map 还是 Reduce,如果是 Reduce 则需先判断完成的 Map 数量(finishedMapTasks) 数量 + 失败的 Map(failedMapTIPs)数量 >=completedMapsForReduceSlowstart(由 ”mapred.reduce.slowstart.completed.maps 参数值 *numMapTasks),满足则表示 Reduce 任务可以启动,否则不可启动。而对于 Map 任务直接计算其 slot 需求。TaskInProgress[] tips = (taskType == TaskType.MAP ? job.getTasks(TaskType.MAP) : job.getTasks(TaskType.REDUCE)),获取对应的 taskInPorgress 数量 (tip),boolean speculationEnabled = (taskType == TaskType.MAP ?job.getMapSpeculativeExecution() : job.getReduceSpeculativeExecution()) 判断是否启用推测执行,double avgProgress = (taskType == TaskType.MAP ?job.getStatus().mapProgress() : job.getStatus().reduceProgress())获取 map/reduce 任务的进度,即 map/reduce 已完成多少,之后计算每个 taskInProgress 的 slot 需求。如果 taskInProgress 未完成则正在运行中,则 demand += tip.getActiveTasks().size()计算出所需的 slot 数量,而 tip 的 ActiveTasks 则是任务调用的时候,即调用 tip.addRunningTask()方法时添加的,而该方法的调用者则是 FairScheduler 的 assignTasks()方法,即方法调度。获取到 tip 的 activeTasks 数量,则就是该 tip 所需要的 slot 数量,同时如果启用了推测执行,则还需多加一个 slot 用于推测执行任务,这样就获得了一个 JobSchedulable 所需的总 slot 数量,求和即为这个 pool 所需的总 slot 数量,当所需数量大于 maxTasks(该 pool 所拥有的最大 slot 数),则返回。继续回到 FairScheduler.update()方法。

 

9.FairScheduler.update():

List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
      List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
      SchedulingAlgorithms.computeFairShares(
          mapScheds, clusterStatus.getMaxMapTasks());
      SchedulingAlgorithms.computeFairShares(
          reduceScheds, clusterStatus.getMaxReduceTasks());
     
      // Use the computed shares to assign shares within each pool
      for (Pool pool: poolMgr.getPools()) {
        pool.getMapSchedulable().redistributeShare();
        pool.getReduceSchedulable().redistributeShare();
      }
     
      if (preemptionEnabled)
        updatePreemptionVariables();

这里涉及的就是 FairScheduler 的核心之处——资源分配算法。先看看前两句,前两句就是获取所有的 MapPoolSchedulable 和 ReducePoolSchedulable,一个 pool 中分别包含一个 MapPoolSchedulable 和 ReducePoolSchedulable。下面两句就是具体的资源分配,调用的是 SchedulingAlgorithms 类进行资源分配的。

 

10.SchedulingAlgorithms.computeFairShares():

private static double slotSUSEdWithWeightToSlotRatio(double w2sRatio,
      Collection<? extends Schedulable> schedulables) {
    double slotsTaken = 0;
    for (Schedulable sched: schedulables) {
      double share = computeShare(sched, w2sRatio);
      slotsTaken += share;
    }
    return slotsTaken;
  }

调用 computeShare()方法根据 job 的 weight 和 w2sRatio(相当于总权重,1.0)计算每个 Schedulable 根据权重应该获得 slot 数量。

 

11.SchedulingAlgorithms.computeShare():第一句 double share = sched.getWeight() * w2sRatio,获取 Pool 的权重,该权重是在 fair-scheduler.xml 中设置 pool 时为 pool 设置了 weigth,默认是 1.0。获得 job 权重之后,根据 weigth*w2sRatio 获得一个 share 值,然后 share=Math.max(share, sched.getMinShare())(minShare 默认是 0),share = Math.min(share, sched.getDemand()),即获得 share 值。

public double getJobWeight(JobInProgress job, TaskType taskType) {
    if (!isRunnable(job)) {
      // Job won’t launch tasks, but don’t return 0 to avoid division errors
      return 1.0;
    } else {
      double weight = 1.0;
      if (sizeBasedWeight) {
        // Set weight based on runnable tasks
        JobInfo info = infos.get(job);
        int runnableTasks = (taskType == TaskType.MAP) ?
            info.mapSchedulable.getDemand() :
            info.reduceSchedulable.getDemand();
        weight = Math.log1p(runnableTasks) / Math.log(2);
      }
      weight *= getPriorityFactor(job.getPriority());
      if (weightAdjuster != null) {
        // Run weight through the user-supplied weightAdjuster
        weight = weightAdjuster.adjustWeight(job, taskType, weight);
      }
      return weight;
    }
  }
private static double computeShare(Schedulable sched, double w2sRatio) {
    double share = sched.getWeight() * w2sRatio;
    share = Math.max(share, sched.getMinShare());
    share = Math.min(share, sched.getDemand());
    return share;
  }

12.SchedulingAlgorithms.computeFairShares:返回到该方法,全总体来看其实这里是一个算法(好吧,这个类本身就是一个算法类),这个算法旨在找出一个合适的 fairShare 值,使得所有 job 的权重 *fairShare 之和最接近 cap 值(Math.min(totalDemand, totalSlots)),这是一个二分查找算法,至于这样做的原因可以参见 SchedulingAlgorithms.computeFairShares 的注释,大致的意思好像是这个值叫做 weighted fair sharing,We call R the weight-to-slots ratio because it converts a Schedulable’s weight to the number of slots it is assigned。也就是这个可以根据这个值 *pool 的权重得到该 pool 所分配到的 slot 数量。就到这,英语不好看不太懂,等别人解释吧。最后 sched.setFairShare(computeShare(sched, right))将这个值设置到 PoolSchedulable 中。

for (Schedulable sched: schedulables) {
      totalDemand += sched.getDemand();
    }
    double cap = Math.min(totalDemand, totalSlots);
    double rMax = 1.0;
    while (slotSUSEdWithWeightToSlotRatio(rMax, schedulables) < cap) {
      rMax *= 2.0;
    }
    // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
    double left = 0;
    double right = rMax;
    for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
      double mid = (left + right) / 2.0;
      if (slotsUsedWithWeightToSlotRatio(mid, schedulables) < cap) {
        left = mid;
      } else {
        right = mid;
      }
    }

13.FairScheduler.update():后面是对每个 JobSchedulable 使用同上方法计算一个 fairShare 值,意义是为 pool 中的每个 job 的可分配的 slot 数量。这里同样会计算 job 的 weigth,job 的权重是由 FairScheduler 计算得到的,在计算权重时,可以选择是否开启根据 job 长度调整权重(由 mapred.fairscheduler.sizebasedweight 参数控制,默认 false),然后根据 job 的优先级判断相应的权重,其对应关系:优先级:VERY_HIGH- 权重:4.0/HIGH:2.0/NORMAL:1.0/LOW:0.5,最后根据 weightAdjuster 进行调整 job 的权重,需要手动实现,由 mapred.fairscheduler.weightadjuster 参数设置,如果你自定义了一个 weightAdjuster 类,则可以通过重写 adjustWeight()方法控制 job 的权重。总之默认情况下一个 job 的权重只是取决于该 Job 优先级。后面的跳过,不是太懂

 

14.FairScheduler.update():最后是判断是否支持抢占机制,即当一个资源池资源有剩余是否允许将剩余资源共享给其他资源池。具体是判断每个资源池中正在运行的任务是否小于资源池本身最小资源量或者需求量,同时还判断该资源池是否急于将资源共享给其他资源,即资源使用量低于共享量的一半。

 private void updatePreemptionVariables() {
    long now = clock.getTime();
    lastPreemptionUpdateTime = now;
    for (TaskType type: MAP_AND_REDUCE) {
      for (PoolSchedulable sched: getPoolSchedulables(type)) {
        if (!isStarvedForMinShare(sched)) {
          sched.setLastTimeAtMinShare(now);
        }
        if (!isStarvedForFairShare(sched)) {
          sched.setLastTimeAtHalfFairShare(now);
        }
        eventLog.log(“PREEMPT_VARS”, sched.getName(), type,
            now – sched.getLastTimeAtMinShare(),
            now – sched.getLastTimeAtHalfFairShare());
      }
    }
  }

到此,整个 FairScheduler 的任务初始化操作或者说 JobListener 的 jobAdded()方法完成了,分析的有遗漏,也许还有错误,如您发现请不吝赐教,谢谢

 

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计16259字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中