Code Review of Spark on Yarn - Client Mode

Posted by CodingCat on March 15, 2015

What I will/won’t talk about

In this article, I will go through the code of Spark on Yarn (Client mode) to present the interaction between the components. To avoid being a super-long article preventing most of the readers, I will not talk too much on the details in Spark Core.

How Spark differentiates different deployment modes

The first question I would like to discuss in this article is that, how Spark differentiates application in different deployment modes.

The key difference among various deployment modes is on the TaskScheduler instance and SchedulerBackend which are started in SparkContext.

As shown in the following code snippets (SparkContext.scala) ,


private def createTaskScheduler(sc: SparkContext, master: String): (SchedulerBackend, TaskScheduler) = {

      //...
      master match {
          case "local" =>
            //ignore
          case "yarn-client" =>
            val scheduler = try {
                val clazz =
                  Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler")
                    val cons = clazz.getConstructor(classOf[SparkContext])
                    cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
                } catch {
                  case e: Exception => {
                    throw new SparkException("YARN mode not available ?", e)
                  }
                }
                val backend = try {
                  val clazz =
                    Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
                  val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
                  cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
                } catch {
                  case e: Exception => {
                    throw new SparkException("YARN mode not available ?", e)
                  }
                }

            scheduler.initialize(backend)
            (backend, scheduler)
        //ignore...
      }

The createTaskScheduler method is called to create the instances of SchedulerBackend and TaskScheduler:

  • SchedulerBackend is the component handling the communication between the Driver (The component starting the application), Executor (The component doing the real task) and Cluster Master (The component managing the resources in cluster, e.g. Master in Spark, ResourceManager in YARN).
  • TaskScheduler is the component deciding the resource allocation among tasks

SchedulerBackend & TaskScheduler in Spark on Yarn

In this section, I will introduce how Spark on Yarn (Client Mode) implements SchedulerBackend and TaskScheduler.

In client mode, SchedulerBackend is implemented by class org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend, and initialized with reflection in SparkContext; TaskScheduler is then implemented by org.apache.spark.scheduler.cluster.YarnScheduler

    case "yarn-client" =>
        val scheduler = try {
          val clazz =
            Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        val backend = try {
          val clazz =
            Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") // CodingCat: starts SchedulerBackend
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        scheduler.initialize(backend)
        (backend, scheduler)

How YarnScheduler and YarnClientSchedulerBackend work

The key methods of TaskScheduler are:

  • start(): initialization and starts SchedulerBackend
  • submitTasks(): calls SchedulerBackend.reviveOffer() to dispatch tasks to the cluster
  • resourceOffers(): the key logic to decide which tasks should be selected for the newly available resources

YarnScheduler is a simple extension of the TaskSchedulerImpl, which is the implementation of TaskScheduler in Spark Core. The only difference between YarnScheduler and TaskSchedulerImpl is that YarnScheduler implements getRackForHost method, which enables rack-aware task scheduling to minimize the cross-rack traffic.

YarnClientSchedulerBackend is an extension of CoarseGrainedSchedulerBackend:

  • In the start() method, it creates a org.apache.spark.deploy.yarn.Client instance which wraps a org.apache.hadoop.yarn.client.api.YarnClient running YARN AppMaster in YARN cluster.
  • The key step to start the AppMaster in YARN cluster (i.e. start the ApplicationMaster) in start() is to call createContainerLaunchContext to create a container for YARN Application Master. This method initializes a org.apache.spark.deploy.yarn.ExecutorLauncher via reflection. ‘ExecutorLauncher’ is a wrapper of ‘org.apache.spark.deploy.yarn.ApplicationMaster’ which is the key logic to start the application.

How ApplicationMaster works

ApplicationMaster wraps a org.apache.spark.deploy.yarn.YarnRMClient which further wraps org.apache.hadoop.yarn.client.api.AMRMClient to request resources from YARN RM.

The key method in ApplicationMaster is run():

  • by calling registerAM, YarnRMClient registers the application master with YARN RM and create org.apache.spark.deploy.yarn.YarnAllocator instance as another wrapper of org.apache.hadoop.yarn.client.api.AMRMClient to call allocate* methods to request resources from RM (In the following code snippets, it’s allocator.allocateResources())
private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
    val sc = sparkContextRef.get()

    val appId = client.getAttemptId().getApplicationId().toString()
    val historyAddress =
      sparkConf.getOption("spark.yarn.historyServer.address")
        .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
        .getOrElse("")

    allocator = client.register(yarnConf,
      if (sc != null) sc.getConf else sparkConf,
      if (sc != null) sc.preferredNodeLocationData else Map(),
      uiAddress,
      historyAddress,
      securityMgr)

    allocator.allocateResources()
    reporterThread = launchReporterThread()
  }