What Does ‘Stage Skipped’ Mean in Apache Spark Web UI?
In the Apache Spark Web UI, the term ‘Stage Skipped’ refers to situations where certain stages of a Spark job are not executed because their outputs are already available in the cache. In Spark, a job is divided into stages which are sequences of computations that can be executed in parallel. Each stage is composed of multiple tasks, where each task is a unit of work sent to an executor.
Here are a few reasons why a stage might be skipped:
1. Cache/Checkpoint:
If data has been cached or checkpointed in a previous stage and that data is required in the current stage, Spark will skip the recomputation of the earlier stage.
2. Re-computation Avoidance:
If a stage’s output is already available and does not need to be recomputed, Spark will skip this stage to save time and computational resources.
3. Speculative Execution:
In some cases, speculative execution might trigger certain tasks to be re-executed. If the speculative tasks complete successfully and their results are already available in the cache, the corresponding stages might be skipped.
To make this more concrete, let’s see an example using PySpark:
from pyspark.sql import SparkSession
# Create Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Create an example DataFrame
data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Cache the DataFrame
df.cache()
# Perform transformations
df_filtered = df.filter(df.Age > 30)
# Perform an action to trigger the job
df_filtered.show()
If you execute the above code, you will observe that the DataFrame is cached. When you perform subsequent actions on the cached DataFrame, Spark may skip the stages that lead to the creation of the cached DataFrame.
Here’s an extended example illustrating cache and ‘Stage Skipped’:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize SparkSession
spark = SparkSession.builder.master("local").appName("StageSkippedExample").getOrCreate()
# Example DataFrame
data = [("Alice", 34), ("Bob", 45), ("Catherine", 29), ("Dave", 31), ("Eve", 39)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
# Cache the DataFrame
df.cache()
# First transformation and action
df_filtered_1 = df.filter(col("Age") > 30)
df_filtered_1.show()
# Second action using the cached DataFrame
df_filtered_2 = df.filter(col("Age") < 40)
df_filtered_2.show()
In this code, the first `df.filter(col(“Age”) > 30).show()` triggers a job with multiple stages. When you perform the second filter operation on the cached DataFrame, Spark will reuse the cached data, and you can see the stage in the Spark UI being skipped.
Output:
+---------+---+
| Name|Age|
+---------+---+
| Alice| 34|
| Bob| 45|
|Catherine| 29|
| Dave| 31|
| Eve| 39|
+---------+---+
+---------+---+
| Name|Age|
+---------+---+
| Alice| 34|
|Catherine| 29|
+---------+---+
Conclusion:
By understanding why a stage is skipped, you can better optimize your Spark jobs, leading to more efficient use of resources and faster execution times. Checking the Spark Web UI is a good practice to ensure understanding and optimization of job execution.