How to Load Data in Spark and Add Filename as a DataFrame Column?

Loading data in Spark and adding the filename as a column to a DataFrame is a common scenario. This can be done using PySpark (Python) by leveraging the DataFrame API and RDD transformations. Below, I’ll provide a detailed explanation along with an example to illustrate this process using PySpark.

Load Data in Spark and Add Filename as a DataFrame Column

Suppose we have multiple CSV files located in a directory, and we want to read these files into a single DataFrame while also adding a column that indicates the source filename for each row. This can be achieved as follows:

Step-by-Step Explanation

1. Read the Data Files

Use the `spark.read.csv` method to read the data files into a DataFrame.

2. Extract the Filenames

Use `wholeTextFiles` function to read the files including paths, and then parse and convert the paths into a separate column.

3. Append Filename as a Column

Use transformations to manipulate and merge data, finally creating a DataFrame where each row has an additional column with the filename.

Example in PySpark

Here’s a code example demonstrating these steps in PySpark:


from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, lit
import os

# Initialize SparkSession
spark = SparkSession.builder.appName("AddFilenameExample").getOrCreate()

# Directory containing CSV files
input_directory = "path/to/csv/files"

# Read CSV files into a DataFrame
df = spark.read.csv(os.path.join(input_directory, "*.csv"), header=True, inferSchema=True)

# Add a column with the filename for each row
df_with_filename = df.withColumn("filename", input_file_name())

# Show the result
df_with_filename.show(truncate=False)

Example in Scala


import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.input_file_name

// Initialize SparkSession
val spark = SparkSession.builder.appName("AddFilenameExample").getOrCreate()

// Directory containing CSV files
val inputDirectory = "path/to/csv/files"

// Read CSV files into a DataFrame
val df = spark.read.option("header", "true").option("inferSchema", "true").csv(s"${inputDirectory}/*.csv")

// Add a column with the filename for each row
val dfWithFilename = df.withColumn("filename", input_file_name())

// Show the result
dfWithFilename.show(truncate = false)

Output

Assuming the CSV files have columns “id” and “value” and are named `file1.csv` and `file2.csv`, the output of `df_with_filename.show(truncate=False)` will look like this:


+---+-----+--------------+
| id|value|filename      |
+---+-----+--------------+
|  1|  100|file:/path/to/csv/files/file1.csv|
|  2|  200|file:/path/to/csv/files/file1.csv|
|  3|  300|file:/path/to/csv/files/file2.csv|
|  4|  400|file:/path/to/csv/files/file2.csv|
+---+-----+--------------+

This approach dynamically adds the filename to each row in the DataFrame, providing traceability of the source data.

About Editorial Team

Our Editorial Team is made up of tech enthusiasts deeply skilled in Apache Spark, PySpark, and Machine Learning, alongside proficiency in Pandas, R, Hive, PostgreSQL, Snowflake, and Databricks. They're not just experts; they're passionate educators, dedicated to demystifying complex data concepts through engaging and easy-to-understand tutorials.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top