Optimizing Spark Partitions to Eliminate Performance Bottlenecks: A Complete Guide

Tuning spark.sql.shuffle.partitions and enabling Adaptive Query Execution (AQE) are the most effective ways to optimize Spark partitions and resolve performance bottlenecks caused by data skew or excessive task overhead.

Apache Spark's execution speed is highly sensitive to how data is partitioned across the cluster. When a job stalls or runs slower than expected, the root cause is often an inappropriate number of shuffle partitions or a mismatch between leaf-node parallelism and the default shuffle parallelism. Understanding the configuration parameters and internal logic within the apache/spark repository allows you to systematically diagnose and fix these issues.

Core Configuration Parameters for Partition Tuning

Spark exposes several critical configuration parameters that control partition behavior. These are defined in sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:

Parameter Default What it controls
spark.sql.shuffle.partitions 200 Number of partitions created for all shuffle stages (joins, aggregations, groupBy, etc.).
spark.sql.leafNodeDefaultParallelism SparkContext.defaultParallelism Parallelism for leaf nodes that produce data (file scans, range, local scans).
spark.sql.adaptive.enabled false (until Spark 3.0) Enables Adaptive Query Execution (AQE) which can dynamically change shuffle partition counts at runtime.
spark.sql.stateful.shuffle.partitions Mirrors spark.sql.shuffle.partitions Number of partitions used for stateful streaming operators (e.g., mapGroupsWithState).

Spark validates that spark.sql.shuffle.partitions > 0 (see the checkValue call in SQLConf.scala lines 904-906). Setting this to zero causes immediate job failure.

How Spark Determines Partition Counts

Static Planning Without AQE

During query planning, EnsureRequirements (located in sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala) inspects each child of a join or exchange node to decide whether reshuffling is necessary:

  1. The algorithm first checks whether all children need reshuffling (shouldConsiderMinParallelism).
  2. If every child would need a shuffle, it enforces the minimum parallelism (conf.defaultNumShufflePartitions)—the value of spark.sql.shuffle.partitions.
  3. Otherwise, it prefers the child that already has the largest number of partitions and only reshuffles the other side.

This logic ensures that Spark does not unnecessarily increase partition counts when one side of a join is already well-distributed.

Adaptive Query Execution Dynamics

When spark.sql.adaptive.enabled is true, AdaptiveSparkPlanExec (in sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala) can dynamically adjust partitions:

  • Coalesce reduces the number of partitions when they are too small, eliminating task-launch overhead.
  • Split increases partitions when a stage becomes a bottleneck, such as when a single partition holds a skewed key.

AQE uses spark.sql.shuffle.partitions as its baseline, then optimizes from there based on runtime statistics.

Practical Guidelines for Common Bottlenecks

Symptom Likely partition issue Recommended fix
Many tiny shuffle tasks (high task-launch overhead, CPU < 10%) spark.sql.shuffle.partitions is too high for the data size. Reduce the setting (e.g., spark.sql.shuffle.partitions=64). Enable spark.sql.adaptive.enabled=true to let Spark auto-coalesce.
Stage with a single long-running task Data skew or insufficient shuffle partitions (default 200 may be too low). Increase the setting, or use salting/repartitioning by column to spread skewed keys.
High shuffle read size per executor (OOM) leafNodeDefaultParallelism is high, leading to many small output files that later get shuffled into few large partitions. Set spark.sql.leafNodeDefaultParallelism to a modest value, or manually repartition downstream.
Streaming job stalls on stateful operators Stateful shuffle partitions inherit the default shuffle count, but streaming may need different partitioning for latency. Explicitly set spark.sql.stateful.shuffle.partitions via spark.conf.set.

API-Level Partition Control

Global Configuration

Set defaults at the SparkSession level to affect all subsequent operations:

spark.conf.set("spark.sql.shuffle.partitions", 100)
spark.conf.set("spark.sql.leafNodeDefaultParallelism", 64)
spark.conf.set("spark.sql.adaptive.enabled", true)

Repartitioning DataFrames

For explicit control over distribution, use the repartition method defined in sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala (lines 1526-1532):

import org.apache.spark.sql.functions._

val df = spark.read.parquet("hdfs:///data/events")
val repart = df
  .repartition(200, $"eventDate")   // hash-partition on a column
  .sortWithinPartitions($"timestamp".desc)

repart.write.mode("overwrite").parquet("hdfs:///out/partitioned")

Range Partitioning

For ordered data without hashing, use repartitionByRange (same file, lines 1996-2001):

val ranged = df.repartitionByRange(50, $"eventDate")

Coalesce for Reducing Partitions

When you need fewer partitions without a full shuffle (e.g., after filtering), use coalesce:

val reduced = df.coalesce(20)   // safe when data is already partitioned reasonably

Dataset.coalesce calls repartition with shuffle = false under the hood.

Streaming Stateful Partitions

For stateful streaming operations, control partitions via the internal config used in sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala (lines 329-336):

spark.conf.set("spark.sql.stateful.shuffle.partitions", 200)
val query = df
  .groupByKey(_.userId)
  .mapGroupsWithState(...)
  .writeStream.format("delta").start()

Why Partition Tuning Matters

Understanding the mechanics of partition optimization helps you balance three critical factors:

  • Task parallelism vs. overhead – Too many partitions create many tiny tasks, causing scheduler and RPC overhead to dominate CPU usage.
  • Shuffle I/O – The number of files written in a shuffle stage equals the number of partitions. Excessive file handles can saturate the driver or executor file system.
  • Data skew – A single partition may become a hotspot; increasing partitions or repartitioning on a different key spreads the load.
  • Memory pressure – Each shuffle task allocates buffers proportional to partition size; overly large partitions risk OutOfMemory errors.

By aligning spark.sql.shuffle.partitions, spark.sql.leafNodeDefaultParallelism, and explicit repartition calls with actual data size and cluster resources, you achieve a balanced parallelism-to-shuffle-cost ratio essential for high-throughput Spark jobs.

Summary

  • Start with spark.sql.shuffle.partitions – The default 200 is rarely optimal; adjust based on your data volume (typically 1-2x the number of cores).
  • Enable Adaptive Query Execution – Set spark.sql.adaptive.enabled=true to let Spark dynamically coalesce small partitions or split skewed ones at runtime.
  • Match leaf-node parallelism – Configure spark.sql.leafNodeDefaultParallelism to prevent over-partitioning at the source when reading files.
  • Use explicit repartitioning for skew – Apply repartition(column) or repartitionByRange to distribute skewed keys before expensive joins or aggregations.
  • Reduce partitions safely with coalesce – Use coalesce() instead of repartition() when decreasing partition counts without requiring a full shuffle.
  • Tune streaming separately – Override spark.sql.stateful.shuffle.partitions for stateful streaming operations to optimize latency.

Frequently Asked Questions

What is the default value of spark.sql.shuffle.partitions?

The default value is 200, defined in sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala. This default was chosen as a general-purpose starting point, but it is rarely optimal for modern workloads. You should adjust this value based on your cluster size and data volume—typically setting it to 1-2 times the total number of CPU cores in your cluster.

How does Adaptive Query Execution improve partition performance?

Adaptive Query Execution (AQE), controlled by spark.sql.adaptive.enabled in SQLConf.scala, allows Spark to dynamically optimize shuffle partition counts at runtime. According to the implementation in AdaptiveSparkPlanExec.scala, AQE can coalesce small partitions to reduce task overhead when partitions are too small, or split large partitions when data skew causes a single partition to become a bottleneck. This eliminates the need to guess the perfect partition count upfront.

When should I use coalesce instead of repartition?

Use coalesce when you need to reduce the number of partitions and can avoid a full shuffle. As implemented in sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala, coalesce simply calls repartition with shuffle = false, making it efficient for operations like filtering where data is already reasonably distributed. Use repartition when you need to increase partitions, change the partitioning key, or ensure a full shuffle to distribute skewed data evenly across the cluster.

How do I handle data skew when optimizing partitions?

Data skew occurs when a few keys contain disproportionately large amounts of data, causing some partitions to run much longer than others. To handle this, increase spark.sql.shuffle.partitions to spread the skewed keys across more tasks, or use salting (adding a random prefix to keys) before repartitioning. Alternatively, use repartitionByRange as defined in Dataset.scala to distribute data based on range rather than hash, which can better handle skewed distributions. For severe skew, enable AQE so Spark can automatically split oversized partitions at runtime.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →