Showing posts with label FairScheduler. Show all posts
Showing posts with label FairScheduler. Show all posts

Wednesday, November 5, 2014

Fair Scheduler In YARN, Hadoop-2.2.0 - Experiment On Preemption

Series:

Since it is said that preemption in Hadoop-2.2.0 is experimental, we are going to take an experiment on how it works.

Our Allocation File (fair-scheduler.xml) is configured as follows:
<?xml version="1.0" encoding="utf-8"?>

<allocations>
  <queue name="default">
    <minResources>6000 mb,12vcores</minResources> 
    <maxResources>300000 mb,48vcores</maxResources> 
    <maxRunningApps>60</maxRunningApps> 
    <weight>1.0</weight> 
    <schedulingPolicy>fair</schedulingPolicy> 
    <minSharePreemptionTimeout>1</minSharePreemptionTimeout>
  </queue> 
  <queue name="supertool">
    <minResources>18000 mb,36vcores</minResources> 
    <maxResources>30000 mb,48vcores</maxResources> 
    <maxRunningApps>60</maxRunningApps> 
    <weight>1.0</weight> 
    <schedulingPolicy>fair</schedulingPolicy> 
    <minSharePreemptionTimeout>1</minSharePreemptionTimeout>
  </queue> 
  <userMaxAppsDefault>5</userMaxAppsDefault> 
  <fairSharePreemptionTimeout>1</fairSharePreemptionTimeout> 
  <defaultQueueSchedulingPolicy>fifo</defaultQueueSchedulingPolicy>
</allocations>

As we can see, there are two queues, namely, root.default and root.supertool.

The total memory in the cluster is 24GB, thus for both of the queues, MaxResources is set to the total memory of cluster.

MinResources of root.supertool is 18GB, whereas that of root.default is 6GB. The former is 3 times to the latter one.

For fairSharePreemptionTimeout and minSharePreemptionTimeout, they are both set to 1 second, that is to say, if preemption is needed, it will preempt resources as quickly as possible.

Our test case script is as below:
#!/bin/bash

source ~/.profile #load environment variable: HADOOP_HOME

#start pi-calculation MapReduce application on root.default for 4 times
nohup hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples- 2.2.0.jar pi -Dmapred.job.queue.name=root.default 12 1000000000 2>&1 > nonono1.out & 
nohup hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples- 2.2.0.jar pi -Dmapred.job.queue.name=root.default 12 1000000000 2>&1 > nonono2.out & 
nohup hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples- 2.2.0.jar pi -Dmapred.job.queue.name=root.default 12 1000000000 2>&1 > nonono3.out & 
nohup hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples- 2.2.0.jar pi -Dmapred.job.queue.name=root.default 12 1000000000 2>&1 > nonono4.out &

#sleep some seconds waiting for the above applications to start completely
echo "sleep 60 seconds."
sleep 60

#start pi-calculation MapReduce application on root.supertool
nohup hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples- 2.2.0.jar pi -Dmapred.job.queue.name=root.supertool 12 1000000000 2>&1 > nononom.out &

echo "done!"

Firstly, we'll start 4 MapReduce application in root.default. After 60 seconds, which will give the above 4 tasks enough time to fully start, another MapReduce application will be launched in root.supertool.

Case 1: Preemption Off


<property>
  <name>yarn.scheduler.fair.preemption</name>
  <value>false</value>
</property>

root.default occupies 100% resources of cluster when the first 4 applications is in.



After 60 seconds, when the application submitted to root.supertool starts, resources, which are released normally from mappers or reducers in applications from root.default, is allocated preferentially to root.supertool because root.supertool is far below its FairShare. Application in root.supertool nevertheless will not preempt resources from applications in root.default due to the turn-off of preemption.

There are several snapshots on YARN monitor webpage in chronological order:





As we can see, the ratio of resource occupation between root.supertool and root.default is 66.7/33.3≈2, which doesn't  manage to achieve 18GB/6GB=3 configured in fair-scheduler.xml.

FYI, there is a dashed box to the right of green bar for root.supertool, that represents the amount of FairShare for the queue.



Case 2: Preemption On


<property>
  <name>yarn.scheduler.fair.preemption</name>
  <value>true</value>
</property>

In this time, root.default still occupies 100% resources of cluster when the first 4 applications is in.

When the subsequent application is submitted to root.supertool after 60 seconds, because the preemption is on and root.supertool is below its FairShare, thus the application will preempt resources from applications located in root.default. The related log of ApplicationMaster for one of the application in root.default is as follows:


From the log, we can see that the procedure of preemption is transparent to user. When preemption happens, records like 'org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report from attempt_1414999804378_0022_m_000000_0: Container preempted by scheduler' will be printed to the log.

Again, the snapshots on YARN monitor webpage in chronological order:



This time, when the application in root.supertool is launched, it will preempt resources from applications in root.default until its own FairShare is met. The ratio of resource occupation between root.supertool and root.default is 75%/25%=3, which totally matches 18GB/6GB=3 which is set in fair-scheduler.xml.

In conclusion, application in root.supertool will starve if all the mappers and reducers from applications in root.default take a super long time, say a week,  provided preemption is off. Whereas if preemption is on, every queue will guaranteed at least MinResources of resource to run applications in it, thus no application will starve and will get resources fairly to the extent.


© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

Fair Scheduler In YARN, Hadoop-2.2.0 - Deep Into Code

Series:

In this post, we'll look through the source code of Fair Scheduler, catch the key point and leave an impression of the procedure on how Fair Scheduler works.

Step 1: Create FairScheduler


Firstly, YARN will check at parameter 'yarn.resourcemanager.scheduler.class' to decide which scheduler will be applied.
<property>
  <name>yarn.resourcemanager.scheduler.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

The relative code is at 'org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.createScheduler()'
protected ResourceScheduler createScheduler() {
  //Read paramter 'yarn.resourcemanager.scheduler.class' to get specific scheduler class name.
  String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
  YarnConfiguration.DEFAULT_RM_SCHEDULER);
  Class<?> schedulerClazz = Class.forName(schedulerClassName);
  return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
        this.conf);
  }

After creating FairScheduler, the FairScheduler.reinitialize() method will be invoked from 'org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.serviceInit()'.
try {
  this.scheduler.reinitialize(conf, this.rmContext);
} catch (IOException ioe) {
  throw new RuntimeException("Failed to initialize scheduler", ioe);
}

Step 2: Main Logic Of FairScheduler


For FairScheduler.reinitialize():
@Override
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
    throws IOException {
  //Initialize parameters relative to FairScheduler in yarn-site.xml
  userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
  nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
  rackLocalityThreshold = this.conf.getLocalityThresholdRack();
  preemptionEnabled = this.conf.getPreemptionEnabled();
  preemptionInterval = this.conf.getPreemptionInterval();
  waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
  ……
 
  if (!initialized) {
    //Initialize QueueManager
    queueMgr.initialize();

    //Initialize UpdateThread
    Thread updateThread = new Thread(new UpdateThread());
    updateThread.setName("FairSchedulerUpdateThread");
    updateThread.setDaemon(true);
    updateThread.start();
  } else {
    //Reload Allocation file (fair-scheduler.xml)
    queueMgr.reloadAllocs();
  }
}

UpdateThread is a daemon thread, which will be invoked periodically.
/**
 * A runnable which calls {@link FairScheduler#update()} every
 * <code>UPDATE_INTERVAL</code> milliseconds.
 */
private class UpdateThread implements Runnable {
  public void run() {
    while (true) {
      try {
        Thread.sleep(UPDATE_INTERVAL);
        //Call FairScheduler.update()
        update();
        //The entry for preemtion
        preemptTasksIfNecessary();
      } catch (Exception e) {
        LOG.error("Exception in fair scheduler UpdateThread", e);
      }
    }
  }
}

Go deep into FairScheduler.update(), several things have done here, namely, reload Allocation File (fair-scheduler.xml), calculate the demand of queues as well as applications in each queue recursively, and calculate FairShare for each queue recursively.
/**
 * Recompute the internal variables used by the scheduler - per-job weights,
 * fair shares, deficits, minimum slot allocations, and amount of used and
 * required resources per job.
 */
protected synchronized void update() {
  queueMgr.reloadAllocsIfNecessary(); // Reload allocation file
  updateRunnability(); // Set job runnability based on user/queue limits
  updatePreemptionVariables(); // Determine if any queues merit preemption

  FSQueue rootQueue = queueMgr.getRootQueue();

  // Recursively update demands for all queues
  rootQueue.updateDemand();

  rootQueue.setFairShare(clusterCapacity);
  // Recursively compute fair shares for all queues
  // and update metrics
  rootQueue.recomputeShares();
}

Step 3: Main Logic Of QueueManager


QueueManager's function is to load and maintain all information on queues, which is configured in Allocation File (fair-scheduler.xml.

QueueManager.initialize() is as follows, in which, reloadAllocs() will invoke on QueueManager.loadQueue() recursively (support for hierarchical queue) in order to load all the queues.
public void initialize() throws IOException, SAXException,
    AllocationConfigurationException, ParserConfigurationException {
  FairSchedulerConfiguration conf = scheduler.getConf();
  // Create queue 'root'
  rootQueue = new FSParentQueue("root", this, scheduler, null);
  queues.put(rootQueue.getName(), rootQueue);
 
  this.allocFile = conf.getAllocationFile();
 
  // Load fair-scheduler.xml
  reloadAllocs();
  lastSuccessfulReload = scheduler.getClock().getTime();
  lastReloadAttempt = scheduler.getClock().getTime();

  // Create the default queue
  getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
}

The leaf queue, non-leaf queue (which has one or more queues as child), and application is abstracted as FSLeafQueue, FSParentQueue, AppSchedulable, respectively, which all extend from 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable'. Obviously, this is composite design pattern.

For the above rootQueue.updateDemand() method, we can see from the code that updateDemand() is called from FSParentQueue, then FSLeafQueue, and finally down to Applications.
@Override
public void FSParentQueue.updateDemand() {
  // Compute demand by iterating through apps in the queue
  // Limit demand to maxResources
  Resource maxRes = queueMgr.getMaxResources(getName());
  demand = Resources.createResource(0);
  // Iterate through child queues, add whose demand as current queue's demand.
  for (FSQueue childQueue : childQueues) {
    childQueue.updateDemand();
    Resource toAdd = childQueue.getDemand();
    demand = Resources.add(demand, toAdd);
    demand = Resources.componentwiseMin(demand, maxRes);
    if (Resources.equals(demand, maxRes)) {
      break;
    }
  }
}

@Override
public void FSLeafQueue.updateDemand() {
  // Compute demand by iterating through apps in the queue
  // Limit demand to maxResources
  Resource maxRes = queueMgr.getMaxResources(getName());
  demand = Resources.createResource(0);
  // Iterate through applications, add whose demand as current queue's demand.
  for (AppSchedulable sched : appScheds) {
    sched.updateDemand();
    Resource toAdd = sched.getDemand();
    demand = Resources.add(demand, toAdd);
    demand = Resources.componentwiseMin(demand, maxRes);
    if (Resources.equals(demand, maxRes)) {
      break;
    }
  }
}

@Override
public void AppSchedulable.updateDemand() {
  // calculate demand
  demand = Resources.createResource(0);
  // Demand is current consumption plus outstanding requests
  Resources.addTo(demand, app.getCurrentConsumption());

  // Add up outstanding resource requests
  for (Priority p : app.getPriorities()) {
    for (ResourceRequest r : app.getResourceRequests(p).values()) {
      Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
      Resources.addTo(demand, total);
    }
  }
}

Likewise, regarding rootQueue.recomputeShares(), it is almost the same design pattern and logic as rootQueue.updateDemand() method.
@Override
public void FSParentQueue.recomputeShares() {
  // Compute FairShare for each child queue on the basis of FairShare of current queue.
  policy.computeShares(childQueues, getFairShare());
  for (FSQueue childQueue : childQueues) {
    childQueue.getMetrics().setFairShare(childQueue.getFairShare());
    // Invoke recomputeShares() method respectively on all child queues.
    childQueue.recomputeShares();
  }
}

@Override
public void FSLeafQueue.recomputeShares() {
  // Compute FairShare for each application on the basis of FairShare of current queue.
  policy.computeShares(getAppSchedulables(), getFairShare());
}

The SchedulingPolicy, whose purpose is to calculate a specific resource amount (For FairScheduler, it is FairShare), mentioned in above has three implementation class, namely, DominantResourceFairnessPolicy, FairSharePolicy, FifoPolicy.


Step 4: Preemption In Fair Scheduler


The entry is at 'FairScheduler.preemptTasksIfNecessary()'.
/**
 * Check for queues that need tasks preempted, either because they have been
 * below their guaranteed share for minSharePreemptionTimeout or they have
 * been below half their fair share for the fairSharePreemptionTimeout. If
 * such queues exist, compute how many tasks of each type need to be preempted
 * and then select the right ones using preemptTasks.
 */
protected synchronized void preemptTasksIfNecessary() {
  // Check whether preemption is on via parameter 'yarn.scheduler.fair.preemption' in yarn-site.xml
  if (!preemptionEnabled) {
    return;
  }

  long curTime = clock.getTime();
  // Check on the preemption interval
  if (curTime - lastPreemptCheckTime < preemptionInterval) {
    return;
  }
  lastPreemptCheckTime = curTime;

  Resource resToPreempt = Resources.none();

  for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
    // Sum up the amount of resource needed to preempt
    resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); 
  }
  if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, Resources.none())) {
    // Start to preempt
    preemptResources(queueMgr.getLeafQueues(), resToPreempt);
  }
}

In which, the resToPreempt() is the implementation method of calculating how much resource is needed to preempt.
/**
 * Return the resource amount that this queue is allowed to preempt, if any.
 * If the queue has been below its min share for at least its preemption
 * timeout, it should preempt the difference between its current share and
 * this min share. If it has been below half its fair share for at least the
 * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
 * full fair share. If both conditions hold, we preempt the max of the two
 * amounts (this shouldn't happen unless someone sets the timeouts to be
 * identical for some reason).
 */
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
  String queue = sched.getName();
  long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
  long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
  Resource resDueToMinShare = Resources.none();
  Resource resDueToFairShare = Resources.none();
  //Min Share Preemption
  //MAX(0, MIN(minShare, demand)-haveUsed)
  if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
    Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, sched.getMinShare(), sched.getDemand());
    resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
  }
  //Fair Share Preemption
  //MAX(0, MIN(fairShare, demand)-haveUsed)
  if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
    Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, sched.getFairShare(), sched.getDemand());
    resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
  //MAX(Min Share Preemption, Fair Share Preemption)
  Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, resDueToMinShare, resDueToFairShare);
 
  return resToPreempt;
}

The main logic of preemption is at 'FairScheduler.preemptResources()'.
/**
 * Preempt a quantity of resources from a list of QueueSchedulables. The
 * policy for this is to pick apps from queues that are over their fair share,
 * but make sure that no queue is placed below its fair share in the process.
 * We further prioritize preemption by choosing containers with lowest
 * priority to preempt.
 */
protected void preemptResources(Collection<FSLeafQueue> scheds, Resource toPreempt) {
  // Collect running containers from over-scheduled queues
  List<RMContainer> runningContainers = new ArrayList<RMContainer>();

  for (FSLeafQueue sched : scheds) {
    //Put all the applications whose occupied resources is above FairShare into to-preempt queue.
    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, sched.getResourceUsage(), sched.getFairShare())) {
      for (AppSchedulable as : sched.getAppSchedulables()) {
        for (RMContainer c : as.getApp().getLiveContainers()) {
          runningContainers.add(c);
          apps.put(c, as.getApp());
          queues.put(c, sched);
        }
      }
    }
  }

  // Sort containers into reverse order of priority
  Collections.sort(runningContainers, new Comparator<RMContainer>() {
    public int compare(RMContainer c1, RMContainer c2) {
      int ret = c1.getContainer().getPriority().compareTo(c2.getContainer().getPriority());
      if (ret == 0) {
        return c2.getContainerId().compareTo(c1.getContainerId());
      }
      return ret;
    }
  });
 
  // Scan down the list of containers we've already warned and kill them
  // if we need to.  Remove any containers from the list that we don't need
  // or that are no longer running.
  Iterator<RMContainer> warnedIter = warnedContainers.iterator();
  Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
  while (warnedIter.hasNext()) {
    RMContainer container = warnedIter.next();
    if (container.getState() == RMContainerState.RUNNING &&
        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, toPreempt, Resources.none())) {
      warnOrKillContainer(container, apps.get(container), queues.get(container));
      preemptedThisRound.add(container);
      Resources.subtractFrom(toPreempt, container.getContainer().getResource());
    } else {
      warnedIter.remove();
    }
  }

  // Scan down the rest of the containers until we've preempted enough, making
  // sure we don't preempt too many from any queue
  Iterator<RMContainer> runningIter = runningContainers.iterator();
  while (runningIter.hasNext() &&
    Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, toPreempt, Resources.none())) {
    RMContainer container = runningIter.next();
    FSLeafQueue sched = queues.get(container);
    if (!preemptedThisRound.contains(container) &&
        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, sched.getResourceUsage(), sched.getFairShare())) {
      warnOrKillContainer(container, apps.get(container), sched);
     
      warnedContainers.add(container);
      Resources.subtractFrom(toPreempt, container.getContainer().getResource());
    }
  }
}

The final kill logic is in FairScheduler.warnOrKillContainer(). As stated in official document, preemption in Hadoop-2.2.0 is experimental. This can be seen in this method:
private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, FSLeafQueue queue) {
  Long time = app.getContainerPreemptionTime(container);

  if (time != null) {
    // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
    // proceed with kill
    if (time + waitTimeBeforeKill < clock.getTime()) {
      ContainerStatus status =
        SchedulerUtils.createPreemptedContainerStatus(
          container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);

      // TODO: Not sure if this ever actually adds this to the list of cleanup
      // containers on the RMNode (see SchedulerNode.releaseContainer()).
      completedContainer(container, status, RMContainerEventType.KILL);
      LOG.info("Killing container" + container +
          " (after waiting for premption for " +
          (clock.getTime() - time) + "ms)");
    }
  } else {
    // track the request in the FSSchedulerApp itself
    app.addPreemption(container, clock.getTime());
  }
}

According to our experiment (Related post: Fair Scheduler In YARN, Hadoop-2.2.0 - Experiment On Preemption), the preemption is valid in Hadoop-2.2.0.

Lastly, here's a sequence diagram for all essential invocations at the scope of FairScheduler:



© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

Tuesday, November 4, 2014

Fair Scheduler In YARN, Hadoop-2.2.0 - Overall Introduction

Series:


Part 1. Introduction


Fair Scheduler is one of the schedulers in ResourceManager of YARN. Sited from the official document, fair scheduling is a method of assigning resources to jobs such that all jobs get, on average, an equal share of resources over time. When there is a single job running, that job uses the entire cluster. When other jobs are submitted, tasks slots that free up are assigned to the new jobs, so that each job gets roughly the same amount of CPU time. Unlike the default Hadoop scheduler (Fifo Scheduler), which forms a queue of jobs, this lets short jobs finish in reasonable time while not starving long jobs. It is also an easy way to share a cluster between multiple of users.

Part 2. Configuration


To apply Fair Scheduler, we can simply add the following to yarn-site.xml:
<property>
  <name>yarn.resourcemanager.scheduler.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

After switching to Fair Scheduler, we can apply some auxiliary parameters located in yarn-site.xml. They are well explained in the official document:

yarn.scheduler.fair.allocation.file
Path to allocation file. An allocation file is an XML manifest describing queues and their properties, in addition to certain policy defaults. This file must be in the XML format described in the next section. If a relative path is given, the file is searched for on the classpath (which typically includes the Hadoop conf directory). Defaults to fair-scheduler.xml.

yarn.scheduler.fair.user-as-default-queue
Whether to use the username associated with the allocation as the default queue name, in the event that a queue name is not specified. If this is set to "false" or unset, all jobs have a shared default queue, named "default". Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored.

yarn.scheduler.fair.preemption
Whether to use preemption. Note that preemption is experimental in the current version. Defaults to false.
Source code of preemption is analyzed in Fair Scheduler In YARN, Hadoop-2.2.0 - Deep Into Code, and an experiment on it is shown in Fair Scheduler In YARN, Hadoop-2.2.0 - Experiment On Preemption.

yarn.scheduler.fair.sizebasedweight
Whether to assign shares to individual apps based on their size, rather than providing an equal share to all apps regardless of size. When set to true, apps are weighted by the natural logarithm of one plus the app's total requested memory, divided by the natural logarithm of 2. Defaults to false.

yarn.scheduler.fair.assignmultiple
Whether to allow multiple container assignments in one heartbeat. Defaults to false.

yarn.scheduler.fair.max.assign
If assignmultiple is true, the maximum amount of containers that can be assigned in one heartbeat. Defaults to -1, which sets no limit.

yarn.scheduler.fair.locality.threshold.node
For applications that request containers on particular nodes, the number of scheduling opportunities since the last container assignment to wait before accepting a placement on another node. Expressed as a float between 0 and 1, which, as a fraction of the cluster size, is the number of scheduling opportunities to pass up. The default value of -1.0 means don't pass up any scheduling opportunities.

yarn.scheduler.fair.locality.threshold.rack
For applications that request containers on particular racks, the number of scheduling opportunities since the last container assignment to wait before accepting a placement on another rack. Expressed as a float between 0 and 1, which, as a fraction of the cluster size, is the number of scheduling opportunities to pass up. The default value of -1.0 means don't pass up any scheduling opportunities.

yarn.scheduler.fair.allow-undeclared-pools
If this is true, new queues can be created at application submission time, whether because they are specified as the application's queue by the submitter or because they are placed there by the user-as-default-queue property. If this is false, any time an app would be placed in a queue that is not specified in the allocations file, it is placed in the "default" queue instead. Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored.

Moreover, regarding 'Allocation File', which should be in XML format, specified by 'yarn.scheduler.fair.allocation.file', the relative parameters are as follows:

Queue elements
Which represent queues. Each may contain the following properties:

minResources: minimum resources the queue is entitled to, in the form "X mb, Y vcores". For the single-resource fairness policy, the vcores value is ignored. If a queue's minimum share is not satisfied, it will be offered available resources before any other queue under the same parent. Under the single-resource fairness policy, a queue is considered unsatisfied if its memory usage is below its minimum memory share. Under dominant resource fairness, a queue is considered unsatisfied if its usage for its dominant resource with respect to the cluster capacity is below its minimum share for that resource. If multiple queues are unsatisfied in this situation, resources go to the queue with the smallest ratio between relevant resource usage and minimum. Note that it is possible that a queue that is below its minimum may not immediately get up to its minimum when it submits an application, because already-running jobs may be using those resources.

maxResources: maximum resources a queue is allowed, in the form "X mb, Y vcores". For the single-resource fairness policy, the vcores value is ignored. A queue will never be assigned a container that would put its aggregate usage over this limit.

maxRunningApps: limit the number of apps from the queue to run at once

weight: to share the cluster non-proportionally with other queues. Weights default to 1, and a queue with weight 2 should receive approximately twice as many resources as a queue with the default weight.

schedulingPolicy: to set the scheduling policy of any queue. The allowed values are "fifo"/"fair"/"drf" or any class that extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy. Defaults to "fair". If "fifo", apps with earlier submit times are given preference for containers, but apps submitted later may run concurrently if there is leftover space on the cluster after satisfying the earlier app's requests.
This means how applications within this queue will be scheduled. Fair scheduling policy is always applied between queues.

aclSubmitApps: a list of users and/or groups that can submit apps to the queue. Refer to the ACLs section below for more info on the format of this list and how queue ACLs work.

aclAdministerApps: a list of users and/or groups that can administer a queue. Currently the only administrative action is killing an application. Refer to the ACLs section below for more info on the format of this list and how queue ACLs work.

minSharePreemptionTimeout: number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues.

User elements
which represent settings governing the behavior of individual users. They can contain a single property: maxRunningApps, a limit on the number of running apps for a particular user.

A userMaxAppsDefault element
which sets the default running app limit for any users whose limit is not otherwise specified.

A fairSharePreemptionTimeout element
number of seconds a queue is under its fair share before it will try to preempt containers to take resources from other queues.
This parameter is valid provided 'yarn.scheduler.fair.preemption' in yarn-site.xml is set to true.

A defaultMinSharePreemptionTimeout element
which sets the default number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues; overriden by minSharePreemptionTimeout element in each queue if specified.

A queueMaxAppsDefault element
which sets the default running app limit for queues; overriden by maxRunningApps element in each queue.

A defaultQueueSchedulingPolicy element
which sets the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified. Defaults to "fair".

A queuePlacementPolicy element
which contains a list of rule elements that tell the scheduler how to place incoming apps into queues. Rules are applied in the order that they are listed. Rules may take arguments. All rules accept the "create" argument, which indicates whether the rule can create a new queue. "Create" defaults to true; if set to false and the rule would place the app in a queue that is not configured in the allocations file, we continue on to the next rule. The last rule must be one that can never issue a continue. Valid rules are:

specified: the app is placed into the queue it requested. If the app requested no queue, i.e. it specified "default", we continue.

user: the app is placed into a queue with the name of the user who submitted it.

primaryGroup: the app is placed into a queue with the name of the primary group of the user who submitted it.

secondaryGroupExistingQueue: the app is placed into a queue with a name that matches a secondary group of the user who submitted it. The first secondary group that matches a configured queue will be selected.

default: the app is placed into the queue named "default".

reject: the app is rejected.

An example of Allocation File is as follows:
<?xml version="1.0"?>
<allocations>
  <queue name="sample_queue">
    <minResources>10000 mb,0vcores</minResources>
    <maxResources>90000 mb,0vcores</maxResources>
    <maxRunningApps>50</maxRunningApps>
    <weight>2.0</weight>
    <schedulingPolicy>fair</schedulingPolicy>
    <queue name="sample_sub_queue">
      <aclSubmitApps>charlie</aclSubmitApps>
      <minResources>5000 mb,0vcores</minResources>
    </queue>
  </queue>
 
  <user name="sample_user">
    <maxRunningApps>30</maxRunningApps>
  </user>
  <userMaxAppsDefault>5</userMaxAppsDefault>
 
  <queuePlacementPolicy>
    <rule name="specified" />
    <rule name="primaryGroup" create="false" />
    <rule name="default" />
  </queuePlacementPolicy>
</allocations>

All the parameters are well-explaining as above. When changing parameters while YARN is running, if they are in yarn-site.xml, we need to restart YARN to make it come into force, whereas if in Allocation File (fair-scheduler.xml), they will be reloaded periodically.

Part 3. Fair Scheduler Algorithm

There are 4 indices in Fair Scheduler, namely, MinResources, MaxResources, Demand and FairShare. The first two are configured in Allocation File (fair-scheduler.xml), the rest are calculated on the basis of resource condition at that time.

The way to calculate 'demand' is quite simple and easy, according to the implementation code at 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable.updateDemand()', demand is current consumption plus outstanding requests.

However, calculation on FairShare is a little bit intricate, the formula is as follows:


'd' is for demand, 'w' is for weight, 'm' is for MinResources, 't' is for total resources.

There are two conditions to analyze, let's assume weight is always 1 so as to simplify the issue:

If demand is below MinResources, then FairShare will be equal to current demand. Although MinResources doesn't meet, there's need to preempt as much as MinResources to accomplish the application, that sounds reasonable.

If demand is above MinResources, we will get at least MinResources for our applications to run. That is to say, we should always calculate MinResources quantitatively for every queue, for it acts as the lower bound for all applications running in each queue.

What Fair Scheduler intend to achieve is to find a proper r, which will make the equation true, that is to say, find FairShare for every application so as to consume all the resources in cluster when summing up. The implementation code is at 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares.computeShares()', whose comment is sited as below:
/**
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min and max shares and of the Schedulables are assumed to
* be set beforehand. We compute the fairest possible allocation of shares to
* the Schedulables that respects their min and max shares.
* 
* To understand what this method does, we must first define what weighted
* fair sharing means in the presence of min and max shares. If there
* were no minimum or maximum shares, then weighted fair sharing would be
* achieved if the ratio of slotsAssigned / weight was equal for each
* Schedulable and all slots were assigned. Minimum and maximum shares add a
* further twist - Some Schedulables may have a min share higher than their
* assigned share or a max share lower than their assigned share.
* 
* To deal with these possibilities, we define an assignment of slots as being
* fair if there exists a ratio R such that: Schedulables S where S.minShare
* > R * S.weight are given share S.minShare - Schedulables S where S.maxShare
* < R * S.weight are given S.maxShare - All other Schedulables S are
* assigned share R * S.weight - The sum of all the shares is totalSlots.
* 
* We call R the weight-to-slots ratio because it converts a Schedulable's
* weight to the number of slots it is assigned.
* 
* We compute a fair allocation by finding a suitable weight-to-slot ratio R.
* To do this, we use binary search. Given a ratio R, we compute the number of
* slots that would be used in total with this ratio (the sum of the shares
* computed using the conditions above). If this number of slots is less than
* totalSlots, then R is too small and more slots could be assigned. If the
* number of slots is more than totalSlots, then R is too large.
* 
* We begin the binary search with a lower bound on R of 0 (which means that
* all Schedulables are only given their minShare) and an upper bound computed
* to be large enough that too many slots are given (by doubling R until we
* use more than totalResources resources). The helper method
* resourceUsedWithWeightToResourceRatio computes the total resources used with a
* given value of R.
* 
* The running time of this algorithm is linear in the number of Schedulables,
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
* iterations of binary search is a constant (dependent on desired precision).
*/

When there are plenty of available resources in cluster, a newborn application will take up as much resources as possible so long as it is below its MaxResources. However, while other applications come in, Fair Scheduler will calculate FairShare for every applications and queues.

If preemption is off, the released resources in the future by current running applications will be scheduled to the applications whose occupied resources is below its FairShare preferentially. Whereas if preemption is on, some resourced will be preempted from current running applications whose occupied resources is above its FairShare. In either way, Fair Scheduler will schedule resources among queues and among applications 'fairly' to the greatest extent.

The amount to preempt is calculated by the following formula:


As we can see, the amount that will be preempted is the max between MinShare Preemption and FairShare Preemption. The corresponding implementation code is at 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.resToPreempt()'.




Reference:


© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu