Spark read csv file as dataframe : – Apache Spark, an open-source, distributed computing system, is one of the most powerful tools used for big data analytics. It is known for its speed and ease of use, especially with complex data processing tasks. In this article, we focus on reading and writing CSV files using DataFrame.
Spark Reading CSV Files into DataFrame
Spark offers an API called spark.read.csv()
or spark.read.format("csv")
to read a CSV file into DataFrame. Here’s an example:
val df = spark.read.csv("/path/to/your/csvfile.csv")
Reading Multiple CSV Files
In cases where data is split across various CSV files, Spark provides an efficient way to read multiple files at once.
val df = spark.read.csv("/path/to/your/csvfile1.csv", "/path/to/your/csvfile2.csv")
Reading All CSV Files in a Directory
Rather than providing every file path individually, you can read all CSV files from a directory at once as follows:
val df = spark.read.csv("/path/to/directory/*")
Mode
In Apache Spark, when reading a CSV file into a DataFrame, you can use various modes to control the behavior, especially when dealing with issues like file parsing errors or schema mismatches. Here are the main modes available for reading CSV files in Spark:
Permissive Mode (Default)
Permissive mode is the default mode. It attempts to read all the records in a CSV file, skipping any records that cause parsing errors.
val df = spark.read
.option("mode", "permissive")
.csv("path/to/your/file.csv")
DROPMALFORMED Mode
In this mode, Spark skips and drops any rows that have parsing errors.
val df = spark.read
.option("mode", "DROPMALFORMED")
.csv("path/to/your/file.csv")
FAILFAST Mode
This mode stops the reading process as soon as it encounters any parsing error and fails the entire operation.
val df = spark.read
.option("mode", "FAILFAST")
.csv("path/to/your/file.csv")
Options while Reading CSV File
You can customize CSV reading with various options like delimiter
, inferSchema
, header
, quote
, nullValues
, dateFormat
:
Delimiter
The delimiter
option is used to specify the character that separates fields in a CSV file. By default, Spark assumes the delimiter is a comma (,
), but you can use this option to handle files with different delimiters, such as semicolons, tabs, or other characters.
Example:
val df = spark.read
.option("delimiter", ";")
.csv("path/to/your/file.csv")
InferSchema
The inferSchema
option controls whether Spark should automatically infer the data types of each column. When set to true
(default), Spark examines a sample of the data to determine the appropriate data types for each column. If set to false
, Spark assumes all columns are strings.
Example:
val df = spark.read
.option("inferSchema", "false")
.csv("path/to/your/file.csv")
Header
The header
option indicates whether the first row of the CSV file contains the column names. If set to true
(default), Spark uses the first row as column names; if set to false
, Spark generates default column names.
val df = spark.read
.option("header", "false")
.option("inferSchema", "true")
.csv("path/to/your/file.csv")
Use header
with false
when your CSV file doesn’t have a header row.
Quotes
The quotes
option allows you to specify the quote character used to enclose fields containing special characters or the delimiter itself. This is crucial when dealing with CSV files that use quotes to distinguish between the actual data and the delimiter.
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("quote", "'")
.csv("path/to/your/file.csv")
NullValues
The nullValues
option lets you define a string that represents a null value in your CSV file. Spark replaces occurrences of this string with a null
value in the resulting DataFrame.
Example:
val df = spark.read
.option("nullValue", "NA")
.csv("path/to/your/file.csv")
DateFormat
The dateFormat
option is used to specify the format of date and timestamp columns in the CSV file. This is essential when Spark is expected to parse date and timestamp values accurately.
Example:
val df = spark.read
.option("dateFormat", "yyyy-MM-dd")
.csv("path/to/your/file.csv")
Here, the date format is set to “yyyy-MM-dd”. Adjust this format based on the actual format used in your CSV file.
Reading CSV Files with a User-Specified Schema
It is possible to manually specify the schema when reading a CSV file. This gives you more control over the data types in the DataFrame.
import org.apache.spark.sql.types._
val customSchema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("dob", DateType, true)
))
val df = spark.read
.option("header", "true")
.schema(customSchema)
.csv("/path/to/your/csvfile.csv")
DataFrame Transformations
Transformations are operations in Spark that produce a new DataFrame from an existing one. Once you have a DataFrame in Spark, you can apply a wide range of transformations and actions to manipulate and analyze your data.
Example:
val df2 = df.filter($"age" > 30).select($"name", $"age")
Writing DataFrame to CSV File
To save the processed data back to disk, you can write the DataFrame to a CSV file using write.csv()
method.
df.write.csv("/path/to/output/csvfile.csv")
Options When Writing CSV
There are several options you can configure when writing a DataFrame to a CSV file, like delimiter
, quote
, and header
.
df.write
.option("delimiter", ";")
.option("quote", "\"")
.option("header", "true")
.csv("/path/to/output/csvfile.csv")
Save Mode
When writing a DataFrame to a CSV file in Spark, the mode option allows you to specify the behavior when the target directory already exists. Here are various save modes you can use:
Overwrite
This mode will overwrite the content of the target directory if it already exists. It’s useful when you want to replace the existing data with the new content.
df.write
.mode("overwrite")
.csv("path/to/save/data")
Append
The append mode adds the new data to the existing content in the target directory. It’s handy when you want to add more data to an already existing dataset.
df.write
.mode("append")
.csv("path/to/save/data")
Ignore
The ignore mode avoids writing the DataFrame if the target directory already exists. No changes will be made to the existing data.
df.write
.mode("ignore")
.csv("path/to/save/data")
Error (Default)
If you don’t specify a mode, Spark uses the error mode by default. If the target directory already exists, Spark throws an error, preventing accidental data overwrites. This mode ensures data integrity and can be useful in production scenarios.
df.write
.csv("path/to/save/data")
Choose the appropriate mode based on your use case and whether you want to overwrite, append, ignore, or explicitly handle errors when writing the DataFrame to a CSV file.
Spark Read CSV file as DataFrame Conclusion
In summary, Spark provides powerful tools for loading, transforming, and saving large-scale data. Its CSV capabilities, as examined in this article, are truly flexible and efficient, making Spark a solid choice for data analysis tasks.