How to Calculate Rolling Average with PySpark in Time Series Data?

Calculating a rolling average (or moving average) in PySpark for time series data involves a few key steps. We will use a combination of window functions and the `DataFrame` API for this purpose. Let’s go through the process step-by-step with a detailed explanation and code snippet.

Step-by-Step Guide

1. Import Necessary Libraries

First, you need to import the required libraries and initialize a Spark session if you haven’t already.


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RollingAverage") \
    .getOrCreate()

2. Create Sample Data

Next, let’s create a sample DataFrame containing time series data. For simplicity, we’ll use a simple dataset with dates and values.


from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
import datetime

# Sample data
data = [
    Row(date=datetime.date(2023, 1, 1), value=100),
    Row(date=datetime.date(2023, 1, 2), value=200),
    Row(date=datetime.date(2023, 1, 3), value=300),
    Row(date=datetime.date(2023, 1, 4), value=400),
    Row(date=datetime.date(2023, 1, 5), value=500),
]

schema = StructType([
    StructField("date", DateType(), True),
    StructField("value", IntegerType(), True),
])

df = spark.createDataFrame(data, schema)
df.show()

+----------+-----+
|      date|value|
+----------+-----+
|2023-01-01|  100|
|2023-01-02|  200|
|2023-01-03|  300|
|2023-01-04|  400|
|2023-01-05|  500|
+----------+-----+

3. Define Window Specification

To calculate the rolling average, we need to define a window specification. The window defines the range of rows over which the calculation will occur.


window_spec = Window.orderBy("date").rowsBetween(-2, 0)

In this example, the window specification `rowsBetween(-2, 0)` defines a rolling window that includes the current row and the two preceding rows.

4. Calculate the Rolling Average

We can now use the `avg` function along with our window specification to calculate the rolling average.


df_with_rolling_avg = df.withColumn("rolling_avg", avg(col("value")).over(window_spec))
df_with_rolling_avg.show()

+----------+-----+-----------+
|      date|value|rolling_avg|
+----------+-----+-----------+
|2023-01-01|  100|      100.0|
|2023-01-02|  200|      150.0|
|2023-01-03|  300|      200.0|
|2023-01-04|  400|      300.0|
|2023-01-05|  500|      400.0|
+----------+-----+-----------+

This code adds a new column `rolling_avg` to the DataFrame that contains the rolling average values.

Conclusion

By following these steps, you can calculate the rolling average for time series data in PySpark. Using window functions allows for flexible and efficient computations over specified ranges of rows. This example demonstrates a simple three-day rolling average, but you can adjust the window specification to fit your specific needs.

About Editorial Team

Our Editorial Team is made up of tech enthusiasts who are highly skilled in Apache Spark, PySpark, and Machine Learning. They are also proficient in Python, Pandas, R, Hive, PostgreSQL, Snowflake, and Databricks. They aren't just experts; they are passionate teachers. They are dedicated to making complex data concepts easy to understand through engaging and simple tutorials with examples.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top