Spark Read CSV file as DataFrame

Spark read csv file as dataframe : – Apache Spark, an open-source, distributed computing system, is one of the most powerful tools used for big data analytics. It is known for its speed and ease of use, especially with complex data processing tasks. In this article, we focus on reading and writing CSV files using DataFrame.

Spark Reading CSV Files into DataFrame

Spark offers an API called spark.read.csv() or spark.read.format("csv") to read a CSV file into DataFrame. Here’s an example:

val df = spark.read.csv("/path/to/your/csvfile.csv")

Reading Multiple CSV Files

In cases where data is split across various CSV files, Spark provides an efficient way to read multiple files at once.

val df = spark.read.csv("/path/to/your/csvfile1.csv", "/path/to/your/csvfile2.csv")

Reading All CSV Files in a Directory

Rather than providing every file path individually, you can read all CSV files from a directory at once as follows:

val df = spark.read.csv("/path/to/directory/*")

Mode

In Apache Spark, when reading a CSV file into a DataFrame, you can use various modes to control the behavior, especially when dealing with issues like file parsing errors or schema mismatches. Here are the main modes available for reading CSV files in Spark:

Permissive Mode (Default)

Permissive mode is the default mode. It attempts to read all the records in a CSV file, skipping any records that cause parsing errors.

val df = spark.read
  .option("mode", "permissive")
  .csv("path/to/your/file.csv")

DROPMALFORMED Mode

In this mode, Spark skips and drops any rows that have parsing errors.

val df = spark.read
  .option("mode", "DROPMALFORMED")
  .csv("path/to/your/file.csv")

FAILFAST Mode

This mode stops the reading process as soon as it encounters any parsing error and fails the entire operation.

val df = spark.read
  .option("mode", "FAILFAST")
  .csv("path/to/your/file.csv")

Options while Reading CSV File

You can customize CSV reading with various options like delimiter, inferSchema, header, quote, nullValues, dateFormat:

Delimiter

The delimiter option is used to specify the character that separates fields in a CSV file. By default, Spark assumes the delimiter is a comma (,), but you can use this option to handle files with different delimiters, such as semicolons, tabs, or other characters.

Example:

val df = spark.read
  .option("delimiter", ";")
  .csv("path/to/your/file.csv")

InferSchema

The inferSchema option controls whether Spark should automatically infer the data types of each column. When set to true (default), Spark examines a sample of the data to determine the appropriate data types for each column. If set to false, Spark assumes all columns are strings.

Example:

val df = spark.read
  .option("inferSchema", "false")
  .csv("path/to/your/file.csv")

Header

The header option indicates whether the first row of the CSV file contains the column names. If set to true (default), Spark uses the first row as column names; if set to false, Spark generates default column names.

val df = spark.read
  .option("header", "false")
  .option("inferSchema", "true")
  .csv("path/to/your/file.csv")

Use header with false when your CSV file doesn’t have a header row.

Quotes

The quotes option allows you to specify the quote character used to enclose fields containing special characters or the delimiter itself. This is crucial when dealing with CSV files that use quotes to distinguish between the actual data and the delimiter.

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .option("quote", "'")
  .csv("path/to/your/file.csv")

NullValues

The nullValues option lets you define a string that represents a null value in your CSV file. Spark replaces occurrences of this string with a null value in the resulting DataFrame.

Example:

val df = spark.read
  .option("nullValue", "NA")
  .csv("path/to/your/file.csv")

DateFormat

The dateFormat option is used to specify the format of date and timestamp columns in the CSV file. This is essential when Spark is expected to parse date and timestamp values accurately.

Example:

val df = spark.read
  .option("dateFormat", "yyyy-MM-dd")
  .csv("path/to/your/file.csv")

Here, the date format is set to “yyyy-MM-dd”. Adjust this format based on the actual format used in your CSV file.

Reading CSV Files with a User-Specified Schema

It is possible to manually specify the schema when reading a CSV file. This gives you more control over the data types in the DataFrame.

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("name", StringType, true),
  StructField("age", IntegerType, true),
  StructField("dob", DateType, true)
))

val df = spark.read
    .option("header", "true")
    .schema(customSchema)
    .csv("/path/to/your/csvfile.csv")

DataFrame Transformations

Transformations are operations in Spark that produce a new DataFrame from an existing one. Once you have a DataFrame in Spark, you can apply a wide range of transformations and actions to manipulate and analyze your data.

Example:

val df2 = df.filter($"age" > 30).select($"name", $"age")

Writing DataFrame to CSV File

To save the processed data back to disk, you can write the DataFrame to a CSV file using write.csv() method.

df.write.csv("/path/to/output/csvfile.csv")

Options When Writing CSV

There are several options you can configure when writing a DataFrame to a CSV file, like delimiter, quote, and header.

df.write
  .option("delimiter", ";")
  .option("quote", "\"")
  .option("header", "true")
  .csv("/path/to/output/csvfile.csv")

Save Mode

When writing a DataFrame to a CSV file in Spark, the mode option allows you to specify the behavior when the target directory already exists. Here are various save modes you can use:

Overwrite

This mode will overwrite the content of the target directory if it already exists. It’s useful when you want to replace the existing data with the new content.

df.write
  .mode("overwrite")
  .csv("path/to/save/data")

Append

The append mode adds the new data to the existing content in the target directory. It’s handy when you want to add more data to an already existing dataset.

df.write
  .mode("append")
  .csv("path/to/save/data")

Ignore

The ignore mode avoids writing the DataFrame if the target directory already exists. No changes will be made to the existing data.

df.write
  .mode("ignore")
  .csv("path/to/save/data")

Error (Default)

If you don’t specify a mode, Spark uses the error mode by default. If the target directory already exists, Spark throws an error, preventing accidental data overwrites. This mode ensures data integrity and can be useful in production scenarios.

df.write
  .csv("path/to/save/data")

Choose the appropriate mode based on your use case and whether you want to overwrite, append, ignore, or explicitly handle errors when writing the DataFrame to a CSV file.

Spark Read CSV file as DataFrame Conclusion

In summary, Spark provides powerful tools for loading, transforming, and saving large-scale data. Its CSV capabilities, as examined in this article, are truly flexible and efficient, making Spark a solid choice for data analysis tasks.

About Editorial Team

Our Editorial Team is made up of tech enthusiasts who are highly skilled in Apache Spark, PySpark, and Machine Learning. They are also proficient in Python, Pandas, R, Hive, PostgreSQL, Snowflake, and Databricks. They aren't just experts; they are passionate teachers. They are dedicated to making complex data concepts easy to understand through engaging and simple tutorials with examples.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top