Master Data Combination: How to Use Left Outer Join in Spark SQL

In the realm of data analysis and data processing, joining tables or datasets is a critical operation that enables us to merge data on a common set of keys. Apache Spark is a powerful data processing framework that provides various types of join operations through its SQL module. One such join is the left outer join which combines records from two DataFrames, or tables, and includes all records from the left DataFrame (table) along with the matched records from the right DataFrame (table), plus the unmatched rows filled with null. This comprehensive guide will discuss how to perform left outer joins in Spark SQL using Scala.

Understanding Left Outer Joins

Before diving into implementation details, it’s essential to grasp the concept of left outer joins. This type of join takes two datasets, typically referred to as left and right, and matches each row from the left dataset with rows from the right dataset based on a common key or set of keys. If a row in the left dataset has no matching rows in the right dataset, the result set still includes this row from the left dataset, with null values filling the corresponding columns from the right dataset.

Setting Up the Spark Session

First things first, to perform any operation in Spark SQL using Scala, we need to set up the Spark Session, which is the entry point to all Spark SQL functionality.


import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Left Outer Join Example")
  .master("local[*]") // Use local mode for example purposes.
  .getOrCreate()

With the Spark session established, we can now create DataFrames (the core data structure in Spark SQL) that we will subsequently join using the left outer join.

Creating Example DataFrames

To illustrate left outer joins, let’s create two simple DataFrames that we will join together. We assume that these DataFrames could represent tables in a database or data read from CSV, JSON, Parquet, or other file formats.


import spark.implicits._

val employees = Seq(
  (1, "Alice"),
  (2, "Bob"),
  (3, "Carol")
).toDF("id", "name")

val departments = Seq(
  (1, "HR"),
  (2, "Marketing"),
  (4, "Finance")
).toDF("emp_id", "dept_name")

employees.show()
departments.show()

Here, `employees` is a DataFrame with employee ID and names, while `departments` is a DataFrame with employee ID and department names. Note that there is an employee (Carol, 3) with no corresponding department, and a department (Finance, 4) with no corresponding employee.

Performing a Left Outer Join

Next, we will perform the left outer join operation. In Spark SQL, DataFrame’s `join` method is used to join two DataFrames on a given join expression and join type.


val leftOuterJoinDf = employees.join(departments, employees("id") === departments("emp_id"), "left_outer")

leftOuterJoinDf.show()

After executing this code block, we should see the following output:


+---+-----+------+---------+
| id| name|emp_id|dept_name|
+---+-----+------+---------+
|  1|Alice|     1|       HR|
|  2|  Bob|     2|Marketing|
|  3|Carol|  null|     null|
+---+-----+------+---------+

This output indicates that employee Carol does not have a matching department entry and thus has null values for the `emp_id` and `dept_name` fields.

Using DataFrame DSL for Left Outer Joins

Alternatively, there is also the DataFrame domain-specific language (DSL) to perform the left outer join. The DSL provides a more expressive and readable syntax for data transformations.


val leftOuterJoinDfDsl = employees.join(departments, Seq("id"), "left_outer")

leftOuterJoinDfDsl.show()

When executing this code snippet, the output will be slightly different from the previous one because we joined on a common column, renaming it just once:


+---+-----+---------+
| id| name|dept_name|
+---+-----+---------+
|  1|Alice|       HR|
|  2|  Bob|Marketing|
|  3|Carol|     null|
+---+-----+---------+

Using Spark SQL Queries for Left Outer Joins

Spark SQL also allows us to run SQL queries directly on DataFrames after registering them as temporary views. This method leverages actual SQL syntax, which might be more familiar for those with a SQL background.


employees.createOrReplaceTempView("employees")
departments.createOrReplaceTempView("departments")

val leftOuterJoinSql = spark.sql("""
  SELECT *
  FROM employees
  LEFT OUTER JOIN departments ON employees.id = departments.emp_id
""")
leftOuterJoinSql.show()

Running this query will yield a result similar to our first join operation:


+---+-----+------+---------+
| id| name|emp_id|dept_name|
+---+-----+------+---------+
|  1|Alice|     1|       HR|
|  2|  Bob|     2|Marketing|
|  3|Carol|  null|     null|
+---+-----+------+---------+

Handling Duplicate Column Names

When joining tables with duplicate column names, Spark will error out unless we handle those duplicate names appropriately. We can either rename the columns before the join or drop one of the columns after the join.

Renaming Columns

To solve the issue of duplicate columns prior to performing the join, we can use the `withColumnRenamed` transformation to rename one of the columns.


val departmentsRenamed = departments.withColumnRenamed("emp_id", "department_emp_id")

val leftOuterJoinRenamed = employees.join(departmentsRenamed, $"id" === $"department_emp_id", "left_outer")

leftOuterJoinRenamed.show()

The resulting DataFrame will have unique column names:


+---+-----+-----------------+---------+
| id| name|department_emp_id|dept_name|
+---+-----+-----------------+---------+
|  1|Alice|                1|       HR|
|  2|  Bob|                2|Marketing|
|  3|Carol|             null|     null|
+---+-----+-----------------+---------+

Dropping Columns After the Join

Alternatively, if duplicating columns doesn’t add any value, we can drop one of them after performing the join. This is done using the `drop` method.


val leftOuterJoinDropped = leftOuterJoinDf.drop(departments("emp_id"))

leftOuterJoinDropped.show()

This would result in a DataFrame without the duplicate `emp_id` column:


+---+-----+---------+
| id| name|dept_name|
+---+-----+---------+
|  1|Alice|       HR|
|  2|  Bob|Marketing|
|  3|Carol|     null|
+---+-----+---------+

Conclusion

Left outer joins are an essential tool in the data scientist’s toolbox, and Spark SQL provides a versatile set of options to perform this operation using Scala. Whether you prefer using DataFrame operations, the DataFrame DSL, or running SQL queries, Spark has got you covered. Remember to handle duplicate column names appropriately to avoid errors and confusion. With this comprehensive guide, you should now have a strong understanding of how to perform left outer joins in Spark SQL, giving you the capability to merge datasets flexibly in your big data pipelines.

About Editorial Team

Our Editorial Team is made up of tech enthusiasts who are highly skilled in Apache Spark, PySpark, and Machine Learning. They are also proficient in Python, Pandas, R, Hive, PostgreSQL, Snowflake, and Databricks. They aren't just experts; they are passionate teachers. They are dedicated to making complex data concepts easy to understand through engaging and simple tutorials with examples.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top