How Spark Decides PreferredLocation for a Task

Posted by CodingCat on February 29, 2016

In the post, we will go through the Spark code on how Spark decides the PreferredLocation of a task.

Data Locality is a concept which is usually used in data processing frameworks like Hadoop/Spark. It means that we prefer to locate the computing task close to its input so that we can reduce the overhead to transfer input data for the task. For example, in Hadoop, we want to run Mapper task in the server on which the DataNode of HDFS stores the input chunk needed by that Mapper.

In the following paragraphs, we will go through the implementation of Spark 1.6 to understand how Spark decides the best place to run the tasks in terms of data locality.

I assume that the reader of this article understands the basic concepts in Spark, e.g. DAGScheduler, Stage, Job, etc., so I will not bother to explain this terminologies.

The following code snippet shows how Spark submits the Stage to run. It is a recurive function, the end point of the recrusion is teh calling of submitMissingTask. In submitMissingTask, we submit the first stage of the Spark job for execution.

org/apache/spark/scheduler/DAGScheduler.scala
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}

In submissionMissingTasks, we see the following lines:

org/apache/spark/scheduler/DAGScheduler.scala
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
}

From the name of the variable, we get that in DAGScheduler, we call function getPreferredLocs to assign values to taskIdToLocations which is essentially to map partition ID to the location where the partition Data locates, i.e. the input of the task for that partition locates.

We then move forward to getPreferredLocs

org/apache/spark/scheduler/DAGScheduler.scala
private[spark]
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
getPreferredLocsInternal(rdd, partition, new HashSet)
}
org/apache/spark/scheduler/DAGScheduler.scala
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}

The function getPreferredLocsInternal makes the last shot. From line 1545, we see that the preferredLocations method in RDD output the value of preferredLocation. It returns a sequence of the host names of the preferred server (see the following code). If this method returns None, Spark will try to find the preferredLocation of the NarrowDependency of the current RDD. NarrowDepedency means that the individual partition of the current RDD depends on only a small number of partitions of the parent RDD. We take the preferredLocation of any of the depended RDD’s partitions as the preferredLocation of the current partition. If we still cannot find the preferredLocation, we conclude that the current partition does not care about where to run the task.

org/apache/spark/rdd/RDD.scala
final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}

The last question we would like to address is how the RDD’s preferredLocation is implemented. From the above code snippets, we see that the checkpoint of the RDD is in the highest priority. Not that hard to guess, the getPreferredLocations of checkpointRDD queries the host storing the split of checkpoint:

org/apache/spark/rdd/ReliableCheckpointRDD.scala
protected override def getPreferredLocations(split: Partition): Seq[String] = {
val status = fs.getFileStatus(
new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index)))
val locations = fs.getFileBlockLocations(status, 0, status.getLen)
locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
}

If the current RDD is not checkpointed, we then find the preferredLocation by calling the its own implementation of getPreferredLocations .

We use another example to show how RDD implements its getPreferredLocations. ShuffleRDD is generated after calling methods like partitionBy. ShuffleRDD implements its getPreferredLocations by querying the location of the shuffle data blocks from MapOutputTracker, which runs in driver side:

org/apache/spark/rdd/ShuffledRDD.scala
override protected def getPreferredLocations(partition: Partition): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
tracker.getPreferredLocationsForShuffle(dep, partition.index)
}