Pivoting and Unpivoting Spark DataFrame

Data manipulation is a fundamental aspect of data analysis where reshaping data plays a critical role. In Apache Spark, transformation operations such as pivoting and unpivoting are essential when working with DataFrames to orient data in a specific format that suits the needs of the analysis. Pivoting is the process of rotating data from a state of rows to columns, which often makes it easier to extract insights by transforming values of one or more columns into multiple columns based on some aggregation function. In contrast, unpivoting, also known as “melting,” is the reverse operation where data is turned from columns into rows, typically to normalize denormalized data. In this extended article, we will cover the different aspects of pivoting and unpivoting DataFrames in Spark using the Scala programming language.

Introduction to Pivoting in Apache Spark

Apache Spark is a powerful, distributed data processing system that is designed for fast computation. It provides the DataFrame abstraction, which is a distributed collection of data organized into named columns – similar to a table in a relational database but with richer optimizations under the hood. Spark SQL provides built-in functions for pivoting that allow you to reorient your DataFrame according to specific requirements.

Understanding Pivoting

To pivot a DataFrame in Spark, you commonly use the groupBy and pivot operations together. The groupBy function groups the data by a specified column or columns, while the pivot function rotates the data from rows into columns, performing some aggregation in the process – the aggregation functions could be sum, count, avg, and so on.

Example of Pivoting a DataFrame

Let’s consider an example DataFrame that includes sales data:


import org.apache.spark.sql.{DataFrame, SparkSession}

val spark: SparkSession = SparkSession.builder()
  .appName("PivotExample")
  .master("local[*]")
  .getOrCreate()

// Sample data
val salesData = Seq(
  ("John", "Apple", 2),
  ("John", "Orange", 3),
  ("John", "Banana", 1),
  ("Emily", "Apple", 4),
  ("Emily", "Banana", 2)
)

// Creating a DataFrame
import spark.implicits._
val df: DataFrame = salesData.toDF("Name", "Fruit", "Quantity")

// Pivot
val pivotedDf = df.groupBy("Name").pivot("Fruit").sum("Quantity")

pivotedDf.show()

The expected output for this operation would be:


+-----+-----+------+------+
| Name|Apple|Banana|Orange|
+-----+-----+------+------+
|Emily|    4|     2|  null|
| John|    2|     1|     3|
+-----+-----+------+------+

Here, we have transformed the ‘Fruit’ column into individual columns grouped by ‘Name’ with the quantities summed up. Note that since Emily did not sell any oranges, that particular aggregation results in a null value.

Advanced Pivoting Techniques

Sometimes, you may need to deal with a dynamic number of pivot values which are not known ahead of time. Under such circumstances, you can retrieve the distinct values first and then pass them to the pivot function.

Pivoting With Dynamic Values

Here is an example of how to perform a pivot operation with dynamic column values:


val distinctFruits = df.select("Fruit").distinct().as[String].collect()
val pivotedDynamicDf = df.groupBy("Name").pivot("Fruit", distinctFruits).sum("Quantity")

pivotedDynamicDf.show()

The collect() function is used to retrieve a local array of the distinct fruit names, which is then passed to the pivot function. This dynamic approach ensures that all possible fruit names will be included in the pivoted columns.

Introduction to Unpivoting in Apache Spark

Unpivoting is the reverse process of pivoting. It is not directly supported in Spark SQL, as there is no dedicated ‘unpivot’ function, but you can achieve the unpivoting effect using the stack function inside selectExpr or by performing a series of unions.

Understanding Unpivoting

Unpivoting involves converting columns into rows, essentially normalizing data where you have one row for each attribute-value pair.

Example of Unpivoting a DataFrame

To illustrate how to unpivot a DataFrame, let’s use the output from our pivot example:


val unpivotedDf = pivotedDf.selectExpr("Name", "stack(3, 'Apple', Apple, 'Banana', Banana, 'Orange', Orange) as (Fruit,Quantity)")

unpivotedDf.show()

The output for this operation would be something like:


+-----+------+--------+
| Name| Fruit|Quantity|
+-----+------+--------+
|Emily| Apple|       4|
|Emily|Banana|       2|
|Emily|Orange|    null|
| John| Apple|       2|
| John|Banana|       1|
| John|Orange|       3|
+-----+------+--------+

In this example, we have effectively converted the three pivoted columns back into rows, using the stack function to specify the number of columns we want to unpivot and providing a list that alternates between the column names and the corresponding column reference. The result is a normalized version of the original pivoted DataFrame.

Unpivoting Using Complex Types

Another approach to achieve unpivoting in Spark is to use complex types like arrays and explode function. In this method, you create an array of structures (key-value pairs) and then use the explode function to transform these arrays into rows.

Unpivoting With Complex Types and Explode Function


import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.struct

val unpivotedComplexDf = pivotedDf.withColumn(
  "FruitQuantity",
  explode(
    array(
      struct(lit("Apple"), $"Apple"),
      struct(lit("Banana"), $"Banana"),
      struct(lit("Orange"), $"Orange")
    )
  )
).select(
  $"Name",
  $"FruitQuantity._1".alias("Fruit"),
  $"FruitQuantity._2".alias("Quantity")
)

unpivotedComplexDf.show()

The output will resemble our previous unpivoted example. We use struct to create the key-value pairs and explode to transform each element of the array into a separate row.


+-----+------+--------+
| Name| Fruit|Quantity|
+-----+------+--------+
|Emily| Apple|       4|
|Emily|Banana|       2|
|Emily|Orange|    null|
| John| Apple|       2|
| John|Banana|       1|
| John|Orange|       3|
+-----+------+--------+

Whether to use pivoting or unpivoting operations depends on the specific requirements of your data analysis tasks. Pivoting can help to surface relationships within the data that are not otherwise evident, while unpivoting can help to simplify the structure of the data for further processing or analysis. With Spark’s powerful DataFrame API and the flexibility of Scala, you can manipulate your data in any number of ways to fit your analytical needs.

The explanations and code examples provided herein should serve as a comprehensive guide to understanding and implementing both pivoting and unpivoting techniques in Apache Spark using Scala. The capability to reshape your data effectively with Spark is a beneficial skill that can significantly streamline your data processing workflows and contribute to more insightful data-driven decision-making.

About Editorial Team

Our Editorial Team is made up of tech enthusiasts who are highly skilled in Apache Spark, PySpark, and Machine Learning. They are also proficient in Python, Pandas, R, Hive, PostgreSQL, Snowflake, and Databricks. They aren't just experts; they are passionate teachers. They are dedicated to making complex data concepts easy to understand through engaging and simple tutorials with examples.

Leave a Comment

Your email address will not be published. Required fields are marked *