Reading Multi-line JSON Files in PySpark

When dealing with big data, it’s common to encounter various file formats—one such format is JSON (JavaScript Object Notation). JSON is a lightweight data interchange format that is easy for humans to read and write and easy for machines to parse and generate. As we dive into the world of big data processing with PySpark, we often encounter scenarios where we have to read multi-line JSON files. Below, we’ll explore how to handle this task using PySpark, a Python API for Apache Spark, aimed at making big data processing simple, fast, and scalable.

Understanding JSON File Formats

JSON files can be represented in two formats: single-line and multi-line. Single-line JSON files have each record written on a new line and are typically easier to read and process because each line is a valid JSON object. On the other hand, multi-line JSON files have records that span multiple lines, making it more challenging to read, as a single complete JSON object can span over numerous lines. PySpark provides robust methods to handle both scenarios.

Setting Up the PySpark Environment

Before we start reading multi-line JSON files, it’s essential to set up the PySpark environment. Begin by installing the required components:


# Installation command for PySpark
!pip install pyspark

After installing PySpark, we will create a `SparkSession`, which is the entry point to programming Spark with the Dataset and DataFrame API:


from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Multiline JSON Reader") \
    .getOrCreate()

Reading a Multi-line JSON File

To read a multi-line JSON file in PySpark, we use the `read` method of the `SparkSession`. Let’s begin with an example JSON file, `multiline_data.json`, which contains multiple JSON records spread across multiple lines:


{
  "id": 1,
  "name": "John Doe",
  "attributes": {
    "hair_color": "brown",
    "eye_color": "blue"
  }
},
{
  "id": 2,
  "name": "Jane Smith",
  "attributes": {
    "hair_color": "black",
    "eye_color": "green"
  }
}

This file cannot be read as is using the default settings since each record does not reside on a single line. However, PySpark has an option named `multiLine` that we can set to `True` to read such files.


# Read multi-line JSON file
df = spark.read.option("multiLine", True).json("multiline_data.json")

# Show the DataFrame
df.show(truncate=False)

If the code executes successfully, the output will display the DataFrame with records:


+---+----------+-----------------------------+
|id |name       |attributes                   |
+---+----------+-----------------------------+
|1  |John Doe  |[brown, blue]                |
|2  |Jane Smith|[black, green]               |
+---+----------+-----------------------------+

As seen, the records from the JSON file are correctly loaded into the PySpark DataFrame, where each JSON object is treated as a row in the DataFrame.

Querying and Manipulating Data

Once the data is loaded, you might want to perform transformations or queries. PySpark DataFrames provide SQL-like methods and functions to work with the data:


# Select specific columns
df.select("id", "name").show()

# Exploding (flattening) nested JSON fields
from pyspark.sql.functions import col
df.select("id", "name", col("attributes.hair_color"), col("attributes.eye_color")).show()

If you run the above snippet, the output will be as follows:


+---+----------+
| id|      name|
+---+----------+
|  1| John Doe|
|  2|Jane Smith|
+---+----------+

+---+----------+-----------+---------+
| id|      name|hair_color|eye_color|
+---+----------+-----------+---------+
|  1| John Doe|      brown|     blue|
|  2|Jane Smith|     black|    green|
+---+----------+-----------+---------+

This output shows a DataFrame where the attributes have been flattened, and each attribute is now represented as a separate column.

Handling Corrupted Records and Configuring Schema

In real-world scenarios, the JSON data may contain corrupted records that don’t match the desired schema or are not properly formatted. PySpark provides a way to handle such cases through the `columnNameOfCorruptRecord` option. Moreover, to ensure the correct data types are applied to each column, you may also define a schema.


from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema for DataFrame
schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("attributes", StringType(), nullable=True)  # Nested fields as StringType for simplicity
])

# Read JSON with schema and handling corrupt records
df_with_schema = spark.read.schema(schema) \
    .option("multiLine", True) \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .json("multiline_data.json")

df_with_schema.show(truncate=False)

The output might reflect any corrupted records in the `_corrupt_record` column:


+---+----------+-----------------------------+---------------+
|id |name       |attributes                   |_corrupt_record|
+---+----------+-----------------------------+---------------+
|1  |John Doe  |{"hair_color": "brown", ...}   |null          |
|2  |Jane Smith|{"hair_color": "black", ...}   |null          |
|null|null     |null                         |{malformed JSON}|
+---+----------+-----------------------------+---------------+

Corrupted records are listed as `null` in normal columns, while their content is captured in the `_corrupt_record` column.

Writing Data to Multi-line JSON

Conversely, you might want to write a DataFrame back to a multi-line JSON file. This is simply the reverse process of reading:


# Write DataFrame back to multi-line JSON
df.write.option("multiLine", True).json("output_multiline_data.json")

This will write the DataFrame `df` to a multi-line JSON file named `output_multiline_data.json` on disk. Remember that multi-line JSON is not as widely adopted or supported as single-line JSON, so ensure that the consuming application or service supports it before writing data in this format.

Conclusion

Handling multi-line JSON files in PySpark can be simple once you’re aware of the `multiLine` option available in the `read` method of a `DataFrame`. With the correct settings and possible schema enforcement, you can deal with complex and nested JSON data effectively. Moreover, being able to handle corrupted records and understanding how to query and manipulate data once it’s read into DataFrames, are critical for anyone working with PySpark and big data.

PySpark offers powerful and flexible tools for processing JSON data, whether it’s reading from or writing to multi-line JSON files. By mastering these capabilities, users can fully tap into the potential of big data processing with Python and Spark.

About Editorial Team

Our Editorial Team is made up of tech enthusiasts who are highly skilled in Apache Spark, PySpark, and Machine Learning. They are also proficient in Python, Pandas, R, Hive, PostgreSQL, Snowflake, and Databricks. They aren't just experts; they are passionate teachers. They are dedicated to making complex data concepts easy to understand through engaging and simple tutorials with examples.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top