In Apache Spark, writing a single CSV file without creating a folder is often required for ease of use and compatibility with other systems. By default, Spark writes the output of a DataFrame into multiple parts within a folder. However, we can coalesce the DataFrame into a single partition before writing it to a CSV file. This ensures that Spark writes only a single CSV file.
Let’s explore an example using PySpark.
Single CSV File Write Using PySpark
Steps:
- Create a SparkSession.
- Sample DataFrame creation.
- Coalesce the DataFrame into a single partition.
- Write the DataFrame to a CSV file.
- Rename the CSV file (optional, as per your requirement).
Code Example in PySpark
from pyspark.sql import SparkSession
# Step 1: Create a SparkSession
spark = SparkSession.builder.appName("SingleCSVWrite").getOrCreate()
# Step 2: Sample DataFrame creation
data = [("James", "Smith", "USA", 34),
("Michael", "Rose", "USA", 33),
("Robert", "Williams", "USA", 37),
("Maria", "Jones", "USA", 28)]
columns = ["Firstname", "Lastname", "Country", "Age"]
df = spark.createDataFrame(data, schema=columns)
# Step 3: Coalesce the DataFrame into a single partition
df_coalesced = df.coalesce(1)
# Step 4: Write the DataFrame to a CSV file
output_path = "/path/to/output/single_csv"
df_coalesced.write.csv(output_path, header=True, mode="overwrite")
# Step 5: Rename the CSV file (Optional)
import shutil
import os
# Find the part file in the output directory
part_file = next(file for file in os.listdir(output_path) if file.startswith("part-"))
# Rename the part file to a desired name
shutil.move(os.path.join(output_path, part_file), "/path/to/output/single_file.csv")
# Cleanup: Remove the empty side-effect folder
shutil.rmtree(output_path)
Code Output
single_file.csv
---------------
Firstname,Lastname,Country,Age
James,Smith,USA,34
Michael,Rose,USA,33
Robert,Williams,USA,37
Maria,Jones,USA,28
In this example, we created a DataFrame and coalesced it into a single partition. This ensures that Spark writes the data into one file. After writing, we found the part file Spark has created and renamed it to our desired output filename, then cleaned up any residual folders and files created in the process.
You can use a similar approach in Scala or Java, ensuring that you coalesce the DataFrame and handle the file system operations accordingly.
Single CSV File Write Using Scala
Steps:
- Create a SparkSession.
- Sample DataFrame creation.
- Coalesce the DataFrame into a single partition.
- Write the DataFrame to a CSV file.
- Rename the CSV file (optional, as per your requirement).
Code Example in Scala
import org.apache.spark.sql.SparkSession
// Step 1: Create a SparkSession
val spark = SparkSession.builder.appName("SingleCSVWrite").getOrCreate()
// Step 2: Sample DataFrame creation
import spark.implicits._
val data = Seq(("James", "Smith", "USA", 34),
("Michael", "Rose", "USA", 33),
("Robert", "Williams", "USA", 37),
("Maria", "Jones", "USA", 28))
val df = data.toDF("Firstname", "Lastname", "Country", "Age")
// Step 3: Coalesce the DataFrame into a single partition
val dfCoalesced = df.coalesce(1)
// Step 4: Write the DataFrame to a CSV file
val outputPath = "/path/to/output/single_csv"
dfCoalesced.write.option("header", "true").csv(outputPath)
// Step 5: Rename the CSV file (Optional)
import java.io.File
import scala.reflect.io.Directory
val dir = new Directory(new File(outputPath))
val partFile = dir.files.find(_.name.startsWith("part-")).get
// Rename the part file to a desired name
val partFilePath = partFile.jfile.getAbsolutePath
new File(partFilePath).renameTo(new File("/path/to/output/single_file.csv"))
// Cleanup: Remove the empty side-effect folder
dir.deleteRecursively()
Code Output
single_file.csv
---------------
Firstname,Lastname,Country,Age
James,Smith,USA,34
Michael,Rose,USA,33
Robert,Williams,USA,37
Maria,Jones,USA,28
This example follows a similar pattern as the PySpark example but is implemented using Scala. The steps and the final output are the same.