Loading data in Spark and adding the filename as a column to a DataFrame is a common scenario. This can be done using PySpark (Python) by leveraging the DataFrame API and RDD transformations. Below, I’ll provide a detailed explanation along with an example to illustrate this process using PySpark.
Load Data in Spark and Add Filename as a DataFrame Column
Suppose we have multiple CSV files located in a directory, and we want to read these files into a single DataFrame while also adding a column that indicates the source filename for each row. This can be achieved as follows:
Step-by-Step Explanation
1. Read the Data Files
Use the `spark.read.csv` method to read the data files into a DataFrame.
2. Extract the Filenames
Use `wholeTextFiles` function to read the files including paths, and then parse and convert the paths into a separate column.
3. Append Filename as a Column
Use transformations to manipulate and merge data, finally creating a DataFrame where each row has an additional column with the filename.
Example in PySpark
Here’s a code example demonstrating these steps in PySpark:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, lit
import os
# Initialize SparkSession
spark = SparkSession.builder.appName("AddFilenameExample").getOrCreate()
# Directory containing CSV files
input_directory = "path/to/csv/files"
# Read CSV files into a DataFrame
df = spark.read.csv(os.path.join(input_directory, "*.csv"), header=True, inferSchema=True)
# Add a column with the filename for each row
df_with_filename = df.withColumn("filename", input_file_name())
# Show the result
df_with_filename.show(truncate=False)
Example in Scala
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.input_file_name
// Initialize SparkSession
val spark = SparkSession.builder.appName("AddFilenameExample").getOrCreate()
// Directory containing CSV files
val inputDirectory = "path/to/csv/files"
// Read CSV files into a DataFrame
val df = spark.read.option("header", "true").option("inferSchema", "true").csv(s"${inputDirectory}/*.csv")
// Add a column with the filename for each row
val dfWithFilename = df.withColumn("filename", input_file_name())
// Show the result
dfWithFilename.show(truncate = false)
Output
Assuming the CSV files have columns “id” and “value” and are named `file1.csv` and `file2.csv`, the output of `df_with_filename.show(truncate=False)` will look like this:
+---+-----+--------------+
| id|value|filename |
+---+-----+--------------+
| 1| 100|file:/path/to/csv/files/file1.csv|
| 2| 200|file:/path/to/csv/files/file1.csv|
| 3| 300|file:/path/to/csv/files/file2.csv|
| 4| 400|file:/path/to/csv/files/file2.csv|
+---+-----+--------------+
This approach dynamically adds the filename to each row in the DataFrame, providing traceability of the source data.