When using Apache Spark, you may often encounter situations where you need to remove duplicate records from a DataFrame while keeping the first occurrence of each duplicate. This can be achieved using the `dropDuplicates` method available in PySpark, Scala, and Java. Below, I provide detailed explanations and code snippets for dropping duplicates and keeping the first entry in Spark DataFrame for both PySpark and Scala.
PySpark
Let’s consider we have the following PySpark DataFrame:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Drop Duplicates").getOrCreate()
data = [
(1, "Alice", 29),
(2, "Bob", 25),
(3, "Alice", 29),
(4, "David", 32)
]
df = spark.createDataFrame(data, ["id", "name", "age"])
df.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 25|
| 3|Alice| 29|
| 4|David| 32|
+---+-----+---+
To drop duplicates and keep the first entry, we can use the `dropDuplicates` method and specify the subset of columns to consider for duplicate detection:
df_unique = df.dropDuplicates(subset=["name", "age"])
df_unique.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 25|
| 4|David| 32|
+---+-----+---+
Scala
Similarly, in Scala, you can achieve the same using the `dropDuplicates` method:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Drop Duplicates").getOrCreate()
import spark.implicits._
val data = Seq(
(1, "Alice", 29),
(2, "Bob", 25),
(3, "Alice", 29),
(4, "David", 32)
)
val df = data.toDF("id", "name", "age")
df.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 25|
| 3|Alice| 29|
| 4|David| 32|
+---+-----+---+
To drop duplicates and keep the first entry in Scala:
val df_unique = df.dropDuplicates("name", "age")
df_unique.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 25|
| 4|David| 32|
+---+-----+---+
Java
In Java, the process is slightly more verbose but follows the same logic. Here’s an example:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
public class DropDuplicates {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Drop Duplicates")
.getOrCreate();
List<Row> data = Arrays.asList(
RowFactory.create(1, "Alice", 29),
RowFactory.create(2, "Bob", 25),
RowFactory.create(3, "Alice", 29),
RowFactory.create(4, "David", 32)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, false, Metadata.empty()),
new StructField("age", DataTypes.IntegerType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
df.show();
Dataset<Row> df_unique = df.dropDuplicates("name", "age");
df_unique.show();
}
}
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 25|
| 3|Alice| 29|
| 4|David| 32|
+---+-----+---+
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 25|
| 4|David| 32|
+---+-----+---+
In these examples, we’ve shown how to drop duplicates based on a subset of columns (`name` and `age`) and keep the first occurrence in PySpark, Scala, and Java. Note that Spark’s `dropDuplicates` method is a powerful tool to handle deduplication efficiently.