In the post, we will go through the Spark code on how Spark decides the PreferredLocation of a task.
Data Locality is a concept which is usually used in data processing frameworks like Hadoop/Spark. It means that we prefer to locate the computing task close to its input so that we can reduce the overhead to transfer input data for the task. For example, in Hadoop, we want to run Mapper task in the server on which the DataNode of HDFS stores the input chunk needed by that Mapper.
In the following paragraphs, we will go through the implementation of Spark 1.6 to understand how Spark decides the best place to run the tasks in terms of data locality.
I assume that the reader of this article understands the basic concepts in Spark, e.g. DAGScheduler, Stage, Job, etc., so I will not bother to explain this terminologies.
The following code snippet shows how Spark submits the Stage to run. It is a recurive function, the end point of the recrusion is teh calling of submitMissingTask. In submitMissingTask, we submit the first stage of the Spark job for execution.
In submissionMissingTasks, we see the following lines:
From the name of the variable, we get that in DAGScheduler, we call function getPreferredLocs to assign values to taskIdToLocations which is essentially to map partition ID to the location where the partition Data locates, i.e. the input of the task for that partition locates.
We then move forward to getPreferredLocs
The function getPreferredLocsInternal makes the last shot. From line 1545, we see that the preferredLocations method in RDD output the value of preferredLocation. It returns a sequence of the host names of the preferred server (see the following code). If this method returns None, Spark will try to find the preferredLocation of the NarrowDependency of the current RDD. NarrowDepedency means that the individual partition of the current RDD depends on only a small number of partitions of the parent RDD. We take the preferredLocation of any of the depended RDD’s partitions as the preferredLocation of the current partition. If we still cannot find the preferredLocation, we conclude that the current partition does not care about where to run the task.
The last question we would like to address is how the RDD’s preferredLocation is implemented. From the above code snippets, we see that the checkpoint of the RDD is in the highest priority. Not that hard to guess, the getPreferredLocations of checkpointRDD queries the host storing the split of checkpoint:
If the current RDD is not checkpointed, we then find the preferredLocation by calling the its own implementation of getPreferredLocations .
We use another example to show how RDD implements its getPreferredLocations. ShuffleRDD is generated after calling methods like partitionBy. ShuffleRDD implements its getPreferredLocations by querying the location of the shuffle data blocks from MapOutputTracker, which runs in driver side: