Exploring Spark 3.0 Features and Examples

Apache Spark 3.0 represents a significant milestone in the evolution of the open-source, distributed computing system that has become one of the leading platforms for large-scale data processing. Released in June 2020, Spark 3.0 introduces a variety of new features and enhancements that improve performance, usability, and compatibility. In this comprehensive guide, we will explore the key features of Spark 3.0 and provide examples using Scala, one of Spark’s primary interface languages.

Adaptive Query Execution

One of the most significant enhancements in Spark 3.0 is Adaptive Query Execution (AQE). AQE is a mechanism that allows Spark to adjust its execution plan on the fly based on real runtime statistics. This dynamic adjustment leads to more efficient use of resources and faster execution times for complex queries.

To illustrate the benefits of AQE, let’s look at a simple example:


import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Adaptive Query Execution Example")
  .config("spark.sql.adaptive.enabled", "true")
  .getOrCreate()

val df = spark.range(0, 10000000)
df.createOrReplaceTempView("test_table")

val result = spark.sql("SELECT * FROM test_table WHERE id % 2 = 0")
result.show()

Normally, Spark would generate a query plan for the SQL statement and use it throughout the execution. With AQE enabled, Spark can change the plan if it realizes it can do a more efficient join or aggregation, for example. The output of the code will be the first 20 even numbers from 0 to 9999999.

Data Source for Binary Files:

Reads various binary files (images, PDFs, ZIPs, etc.) directly into DataFrames.
Example:


val binaryDF = spark.read.format("binaryFile").load("path/to/binary/files")

Structured Streaming UI:

Monitors streaming queries in the Spark UI with detailed statistics. Example: View input rate, process rate, batch duration, and operation duration for each micro-batch.

Enhanced Python Support:

  • Speeds up R UDFs by up to 40x.
  • Redesigns pandas UDF API with type hints and new UDF types.
  • Example: Use pandas UDFs for vectorized operations on DataFrames.

Dynamic Partition Pruning

Dynamic Partition Pruning (DPP) is another significant optimization in Spark 3.0 for improving the performance of queries that involve large partitioned tables. By applying predicates dynamically based on the data being processed, Spark can skip scanning unnecessary partitions, thus saving on I/O and computing resources.

Here’s an example demonstrating dynamic partition pruning:


import org.apache.spark.sql.functions._

val salesData = Seq(
  (1, "2020-01-01", 100),
  (2, "2020-01-02", 200),
  (3, "2020-01-03", 300)
).toDF("id", "date", "amount")

val partitionedSalesData = salesData
  .withColumn("date", to_date(col("date")))
  .repartition(col("date"))

partitionedSalesData.createOrReplaceTempView("sales")

val result = spark.sql("SELECT * FROM sales WHERE date = '2020-01-02'")
result.explain()
result.show()

The query planner in Spark 3.0 will recognize that the filter ‘date = “2020-01-02″‘ can be used to prune partitions and will only read data from the relevant partition, thereby making the query more efficient. The explain plan will show this optimization.

Enhanced ANSI SQL Compliance

Spark 3.0 has made strides towards ANSI SQL compliance, which improves the compatibility with other SQL-based systems and provides more predictable behavior for Spark SQL. This release introduces several SQL features and enhancements conforming to the ANSI standard.

An example of this is the new ANSI SQL parser:


spark.sql("SET spark.sql.parser.ansi.enabled = true")

val df = spark.sql("SELECT * FROM VALUES (1, 'foo'), (2, 'bar') AS t(col1, col2)")
df.show()

The above code uses the ANSI SQL compliant parser to understand the VALUES expression, which is commonly used in SQL. The result will be a DataFrame with two rows containing the specified values.

GPU Acceleration and the RAPIDS Plugin

Spark 3.0 has included support for GPU acceleration through the RAPIDS Accelerator for Apache Spark. This allows certain operations to be offloaded to GPUs, leading to significant speed-ups for the eligible workloads, particularly in machine learning and data analytics applications.

GPU acceleration is enabled through a plugin system. Below is an example of how to configure Spark to use the RAPIDS plugin:


val spark = SparkSession.builder()
  .appName("GPU Acceleration Example")
  .config("spark.plugins", "com.nvidia.spark.SQLPlugin")
  .config("spark.rapids.sql.concurrentGpuTasks", "1")
  .getOrCreate()

// Notice that additional setup, such as having GPU hardware and 
// the RAPIDS jars, is required for this to work.

Note that to run this example, you would need the appropriate GPU hardware and software setup, which includes CUDA libraries and the RAPIDS jars.

New API Enhancements

Spark 3.0 introduces a number of API enhancements to further simplify working with datasets and dataframes. These include support for new data sources, simpler syntax for complex types, and improved null value handling.

For example, let’s explore improved support for complex types:


val complexDF = spark.createDataFrame(Seq(
  (1, Row("Alice", 10)),
  (2, Row("Bob", 15))
))

val schema = new StructType()
  .add("id", IntegerType, nullable = false)
  .add("info", new StructType()
    .add("name", StringType, nullable = false)
    .add("age", IntegerType, nullable = false))

val df = spark.createDataFrame(
  complexDF.rdd,
  schema
)

df.printSchema()
df.show()

This snippet illustrates the more convenient way to define schemas with complex types such as structs, which can now be done in a more readable and concise manner. The output will be a DataFrame with a nested “info” column holding a struct with “name” and “age” fields.

Performance Improvements

Spark 3.0 includes several under-the-hood performance improvements such as enhanced predicate pushdown, more efficient file listing for large directories in cloud storage, and optimizations to the Catalyst optimizer.

Let’s demonstrate predicate pushdown with a simple example involving reading from Parquet:


val sales = spark.read.format("parquet")
  .load("path/to/sales.parquet")
  .filter("year = 2022")

sales.explain()

With predicate pushdown, Spark can push down the filter operation to the Parquet reader, so it only reads the data relevant to the year 2022. The explain plan will show this optimization, indicating improved performance when dealing with large datasets.

Other Notable Features:

  • Support for Hadoop 3.0 and Python 3.6+.
  • ACID transactions for data lakes (preview).

Conclusion

Apache Spark 3.0 introduces a wide array of features and enhancements that address both developer productivity and application performance. Adaptive Query Execution and Dynamic Partition Pruning optimize query execution without requiring user intervention. Increased ANSI SQL compliance improves compatibility and predictability, while GPU acceleration opens the door for significant speed-ups in certain workloads. Finally, API enhancements and performance improvements consolidate Spark’s position as a leading platform for big data processing.

Overall, Spark 3.0 is an exciting release that makes big data processing more efficient, adaptable, and user-friendly, and continues to expand the capabilities of the robust Spark ecosystem. As always, when working with any new version, it’s important to test and evaluate new features in the context of your specific workloads and infrastructure to fully leverage the benefits they provide.

About Editorial Team

Our Editorial Team is made up of tech enthusiasts deeply skilled in Apache Spark, PySpark, and Machine Learning, alongside proficiency in Pandas, R, Hive, PostgreSQL, Snowflake, and Databricks. They're not just experts; they're passionate educators, dedicated to demystifying complex data concepts through engaging and easy-to-understand tutorials.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top