Calculating a rolling average (or moving average) in PySpark for time series data involves a few key steps. We will use a combination of window functions and the `DataFrame` API for this purpose. Let’s go through the process step-by-step with a detailed explanation and code snippet.
Step-by-Step Guide
1. Import Necessary Libraries
First, you need to import the required libraries and initialize a Spark session if you haven’t already.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder \
.appName("RollingAverage") \
.getOrCreate()
2. Create Sample Data
Next, let’s create a sample DataFrame containing time series data. For simplicity, we’ll use a simple dataset with dates and values.
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
import datetime
# Sample data
data = [
Row(date=datetime.date(2023, 1, 1), value=100),
Row(date=datetime.date(2023, 1, 2), value=200),
Row(date=datetime.date(2023, 1, 3), value=300),
Row(date=datetime.date(2023, 1, 4), value=400),
Row(date=datetime.date(2023, 1, 5), value=500),
]
schema = StructType([
StructField("date", DateType(), True),
StructField("value", IntegerType(), True),
])
df = spark.createDataFrame(data, schema)
df.show()
+----------+-----+
| date|value|
+----------+-----+
|2023-01-01| 100|
|2023-01-02| 200|
|2023-01-03| 300|
|2023-01-04| 400|
|2023-01-05| 500|
+----------+-----+
3. Define Window Specification
To calculate the rolling average, we need to define a window specification. The window defines the range of rows over which the calculation will occur.
window_spec = Window.orderBy("date").rowsBetween(-2, 0)
In this example, the window specification `rowsBetween(-2, 0)` defines a rolling window that includes the current row and the two preceding rows.
4. Calculate the Rolling Average
We can now use the `avg` function along with our window specification to calculate the rolling average.
df_with_rolling_avg = df.withColumn("rolling_avg", avg(col("value")).over(window_spec))
df_with_rolling_avg.show()
+----------+-----+-----------+
| date|value|rolling_avg|
+----------+-----+-----------+
|2023-01-01| 100| 100.0|
|2023-01-02| 200| 150.0|
|2023-01-03| 300| 200.0|
|2023-01-04| 400| 300.0|
|2023-01-05| 500| 400.0|
+----------+-----+-----------+
This code adds a new column `rolling_avg` to the DataFrame that contains the rolling average values.
Conclusion
By following these steps, you can calculate the rolling average for time series data in PySpark. Using window functions allows for flexible and efficient computations over specified ranges of rows. This example demonstrates a simple three-day rolling average, but you can adjust the window specification to fit your specific needs.