How to Use JDBC Source to Write and Read Data in PySpark?

Using JDBC (Java Database Connectivity) in PySpark provides an efficient way to connect to a variety of databases and perform read and write operations. Below is an example that demonstrates how to use JDBC to read data from a database and write the processed data back to another (or the same) database.

Using JDBC Source to Read Data

First, let’s start by reading data from a database using JDBC:


from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("JDBC Example") \
    .getOrCreate()

# JDBC connection properties
jdbc_url = "jdbc:mysql://localhost:3306/dbname"
table_name = "employees"
properties = {
    "user": "username",
    "password": "password",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Read data from the database
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=properties)

# Show the DataFrame
df.show()

Output:


+---+-------+--------+------+
| id|   name|location|salary|
+---+-------+--------+------+
|  1|  Alice|New York|  7000|
|  2|    Bob|Chicago |  8000|
|  3|Charlie|New York|  6200|
|  4|  David|Chicago |  6500|
|  5| Edward|New York|  6100|
+---+-------+--------+------+

Processing Data

You can then perform any transformation or action on the DataFrame, such as filtering or aggregating:


# Select employees from New York
ny_employees = df.filter(df.location == "New York")

# Show the DataFrame
ny_employees.show()

Output:


+---+-------+--------+------+
| id|   name|location|salary|
+---+-------+--------+------+
|  1|  Alice|New York|  7000|
|  3|Charlie|New York|  6200|
|  5| Edward|New York|  6100|
+---+-------+--------+------+

Using JDBC Source to Write Data

Once you have the processed DataFrame, you can write it back to a database:


# JDBC target properties
target_jdbc_url = "jdbc:mysql://localhost:3306/target_dbname"
target_table_name = "ny_employees"

# Write the DataFrame back to the database
ny_employees.write \
    .jdbc(url=target_jdbc_url, table=target_table_name, mode="overwrite", properties=properties)

Here is a step-by-step explanation of the above code snippets:

Step 1: Create a Spark Session

A Spark session is created using `SparkSession.builder`, which is the entry point to programming with Spark:


spark = SparkSession.builder \
    .appName("JDBC Example") \
    .getOrCreate()

Step 2: Define JDBC Connection Properties

You need to provide the JDBC URL, the table name, and any properties required for the database connection. In this case, we are connecting to a MySQL database:


jdbc_url = "jdbc:mysql://localhost:3306/dbname"
table_name = "employees"
properties = {
    "user": "username",
    "password": "password",
    "driver": "com.mysql.cj.jdbc.Driver"
}

Step 3: Read Data from the Database

Use the `read.jdbc` method to load the data into a DataFrame:


df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=properties)

Step 4: Process the Data

Filter or transform the DataFrame as needed. In this case, we filter employees from New York:


ny_employees = df.filter(df.location == "New York")

Step 5: Write Data Back to the Database

Use the `write.jdbc` method to save the DataFrame back to the database:


ny_employees.write \
    .jdbc(url=target_jdbc_url, table=target_table_name, mode="overwrite", properties=properties)

By following these steps, you can efficiently read from and write to a relational database using PySpark and JDBC.

About Editorial Team

Our Editorial Team is made up of tech enthusiasts deeply skilled in Apache Spark, PySpark, and Machine Learning, alongside proficiency in Pandas, R, Hive, PostgreSQL, Snowflake, and Databricks. They're not just experts; they're passionate educators, dedicated to demystifying complex data concepts through engaging and easy-to-understand tutorials.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top