I’m currently working on a new project. I have the following question when designing a new feature:
when a task is restarted due to the failure in an executor, how Spark decides whether to run the task in this executor to guarantee the data locality (assume RDD is persisted with the replication factor of 1), or run the task somewhere else to avoid to be failed for the same reason in that executor?
The answer is simple, the information describing the task failed (the task ID as well as the clock time of the failure) will be added to an internal data structure of TaskSetManager. When the task is to be dequeued from the task queue again (with the order respecting the data locality, PROCESS_LOCAL, MACHINE_LOCAL, RACK_LOCAL, ANY), TaskSetManager will check the interval between the current time and the last failure moment. If the length of the interval is longer than a configurable entry
spark.scheduler.executorTaskBlacklistTime, then the task is executed normally. Otherwise, the task will be kept in the queue for the next chance to be dequeued.
In fact, Spark sets
spark.scheduler.executorTaskBlacklistTime as 0 by default, i.e. Spark gives full respect to the data locality by default. When the user set this parameter to be some value larger than 0, she can prevent the abnormal executor from causing repeated task failure in case that the driver hasn’t detected the problem with the executor.