Using JDBC (Java Database Connectivity) in PySpark provides an efficient way to connect to a variety of databases and perform read and write operations. Below is an example that demonstrates how to use JDBC to read data from a database and write the processed data back to another (or the same) database.
Using JDBC Source to Read Data
First, let’s start by reading data from a database using JDBC:
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder \
.appName("JDBC Example") \
.getOrCreate()
# JDBC connection properties
jdbc_url = "jdbc:mysql://localhost:3306/dbname"
table_name = "employees"
properties = {
"user": "username",
"password": "password",
"driver": "com.mysql.cj.jdbc.Driver"
}
# Read data from the database
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=properties)
# Show the DataFrame
df.show()
Output:
+---+-------+--------+------+
| id| name|location|salary|
+---+-------+--------+------+
| 1| Alice|New York| 7000|
| 2| Bob|Chicago | 8000|
| 3|Charlie|New York| 6200|
| 4| David|Chicago | 6500|
| 5| Edward|New York| 6100|
+---+-------+--------+------+
Processing Data
You can then perform any transformation or action on the DataFrame, such as filtering or aggregating:
# Select employees from New York
ny_employees = df.filter(df.location == "New York")
# Show the DataFrame
ny_employees.show()
Output:
+---+-------+--------+------+
| id| name|location|salary|
+---+-------+--------+------+
| 1| Alice|New York| 7000|
| 3|Charlie|New York| 6200|
| 5| Edward|New York| 6100|
+---+-------+--------+------+
Using JDBC Source to Write Data
Once you have the processed DataFrame, you can write it back to a database:
# JDBC target properties
target_jdbc_url = "jdbc:mysql://localhost:3306/target_dbname"
target_table_name = "ny_employees"
# Write the DataFrame back to the database
ny_employees.write \
.jdbc(url=target_jdbc_url, table=target_table_name, mode="overwrite", properties=properties)
Here is a step-by-step explanation of the above code snippets:
Step 1: Create a Spark Session
A Spark session is created using `SparkSession.builder`, which is the entry point to programming with Spark:
spark = SparkSession.builder \
.appName("JDBC Example") \
.getOrCreate()
Step 2: Define JDBC Connection Properties
You need to provide the JDBC URL, the table name, and any properties required for the database connection. In this case, we are connecting to a MySQL database:
jdbc_url = "jdbc:mysql://localhost:3306/dbname"
table_name = "employees"
properties = {
"user": "username",
"password": "password",
"driver": "com.mysql.cj.jdbc.Driver"
}
Step 3: Read Data from the Database
Use the `read.jdbc` method to load the data into a DataFrame:
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=properties)
Step 4: Process the Data
Filter or transform the DataFrame as needed. In this case, we filter employees from New York:
ny_employees = df.filter(df.location == "New York")
Step 5: Write Data Back to the Database
Use the `write.jdbc` method to save the DataFrame back to the database:
ny_employees.write \
.jdbc(url=target_jdbc_url, table=target_table_name, mode="overwrite", properties=properties)
By following these steps, you can efficiently read from and write to a relational database using PySpark and JDBC.