Calculating median and quantiles in PySpark is not as straightforward as calculating basic statistics like mean and sum, due to the distributed nature of data in Spark. However, we can achieve this using a combination of approxQuantile
for quantiles and some custom logic for median. Let’s go through these steps with an example.
Step-by-Step Guide to Calculate Median and Quantiles with PySpark GroupBy
We’ll start by setting up a PySpark session and creating a sample DataFrame.
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, percent_rank
# Create Spark Session
spark = SparkSession.builder.appName("MedianQuantiles").getOrCreate()
# Sample DataFrame
data = [
("Alice", "HR", 2000),
("Bob", "HR", 1500),
("Catherine", "HR", 3000),
("David", "Tech", 2000),
("Edward", "Tech", 4000),
("Frank", "Tech", 2000),
]
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
df.show()
+---------+----------+------+
| Name|Department|Salary|
+---------+----------+------+
| Alice| HR| 2000|
| Bob| HR| 1500|
|Catherine| HR| 3000|
| David| Tech| 2000|
| Edward| Tech| 4000|
| Frank| Tech| 2000|
+---------+----------+------+
Calculating Quantiles
Use the approxQuantile
method to calculate quantiles, which gives an approximate result.
# Function to calculate quantiles
def calculate_quantiles(df, column, probabilities):
return df.approxQuantile(column, probabilities, relativeError=0.01)
# For the 'Salary' column in our sample df
quantiles = calculate_quantiles(df, "Salary", [0.25, 0.5, 0.75])
print(f"25th, 50th (Median), and 75th percentiles of Salary: {quantiles}")
25th, 50th (Median), and 75th percentiles of Salary: [2000.0, 2000.0, 3000.0]
Calculating Median with GroupBy
Median calculation is more involved because it needs to consider the distribution of data. To calculate the median, you can use SQL window functions like percent_rank
, along with a custom logic.
from pyspark.sql import Window
from pyspark.sql.functions import col, percent_rank
# Define window specification
window_spec = Window.partitionBy("Department").orderBy("Salary")
# Add percent_rank for each row
df_with_rank = df.withColumn("percent_rank", percent_rank().over(window_spec))
df_with_rank.show()
+---------+----------+------+------------+
| Name|Department|Salary|percent_rank|
+---------+----------+------+------------+
| Bob| HR| 1500| 0.0|
| Alice| HR| 2000| 0.5|
|Catherine| HR| 3000| 1.0|
| David| Tech| 2000| 0.0|
| Frank| Tech| 2000| 0.0|
| Edward| Tech| 4000| 1.0|
+---------+----------+------+------------+
Now, filter the DataFrame where percent_rank
is close to 0.5 for the median.
# Median filter function
median_df = df_with_rank.filter((col("percent_rank") >= 0.5) & (col("percent_rank") <= 0.5))
# Show median by Department
median_df.show()
+-----+----------+------+------------+
| Name|Department|Salary|percent_rank|
+-----+----------+------+------------+
|Alice| HR| 2000| 0.5|
+-----+----------+------+------------+
In this example, the median for the HR department is 2000. For the Tech department, data needs more conditioning because it has multiple entries with the same salary. You might want to handle edge cases differently based on your needs.
Conclusion
Calculating median and quantiles in PySpark involves using a combination of built-in functions like approxQuantile
and window operations. While quantiles can be calculated directly with approxQuantile
, median calculation requires more detailed handling with window functions and custom filtering.