Common Issues When Configuring the Spark Cassandra Connector for Large Datasets

Developers typically encounter partitioning bottlenecks, memory pressure from oversized fetch requests, and coordinator overload during writes when scaling the Spark Cassandra connector to billions of rows, all of which can be resolved through specific split size, batch size, and consistency level configurations.

The Spark Cassandra connector, maintained by DataStax and widely used with Apache Spark, enables seamless integration between Spark DataFrames and Cassandra tables. When processing large datasets spanning millions or billions of rows, improper configuration of this connector leads to out-of-memory errors, timeout exceptions, and severe underutilization of cluster resources.

Understanding the Connector Architecture

The connector operates as a native Spark data source, translating Spark operations into Cassandra-specific requests. In CassandraSparkContext.scala, the connector provides the entry point for RDD-level access, while CassandraSQLContext.scala handles DataFrame API integration through the org.apache.spark.sql.cassandra format.

When reading data, the connector creates CassandraInputPartition objects based on Cassandra token ranges. The DefaultConnectionFactory.scala manages the underlying DataStax Java driver connections, pooling sessions and respecting keep-alive settings. For writes, CassandraWriteConf.scala encapsulates batch sizing, concurrency, and consistency configurations that directly impact coordinator load.

Partitioning and Parallelism Bottlenecks

The most common performance issue stems from insufficient Spark partitions. By default, the connector generates one RDD partition per Cassandra token range or based on a split size of 64 MB. On large datasets with small node counts, this creates too few partitions to utilize all available executor cores.

To resolve this, reduce spark.cassandra.input.split.size_in_mb or switch to row-based splitting with spark.cassandra.input.split.size_in_rows. Alternatively, explicitly set spark.cassandra.input.split.count to a multiple of your total executor cores.

spark.conf.set("spark.cassandra.input.split.size_in_mb", "16")
spark.conf.set("spark.cassandra.input.split.count", "200")

Memory Pressure and Fetch Size Tuning

Out-of-memory errors on executors typically occur when the driver attempts to materialize too many rows simultaneously. The connector fetches data in pages, and if spark.cassandra.input.fetch.size_in_rows is too high, a single task can overwhelm the executor heap.

Tune the fetch size to a few thousand rows and adjust the page size threshold:

spark.conf.set("spark.cassandra.input.fetch.size_in_rows", "5000")
spark.conf.set("spark.cassandra.input.page.row.size_in_mb", "5")

Write Batching and Coordinator Overload

Write throughput stalls and "Write timeout" exceptions indicate coordinator hotspotting. Large batches sent via spark.cassandra.output.batch.size_rows create single points of pressure on Cassandra coordinators, while insufficient concurrency leaves the driver queue idle.

Keep batch sizes modest (100–200 rows) and increase parallel writes:

spark.conf.set("spark.cassandra.output.batch.size_rows", "100")
spark.conf.set("spark.cassandra.output.concurrent.writes", "20")

For bulk loads, use the Bulk Write mode via spark.cassandra.output.batch.groupingKey to group writes by partition key, reducing coordinator hops.

Consistency Level and Retry Configuration

Setting consistency to ALL on large write-heavy jobs introduces high latency and failure rates. Each write must be acknowledged by every replica, so network hiccups trigger expensive retries.

Use ONE or QUORUM for analytical loads, and configure retry policies:

spark.conf.set("spark.cassandra.output.consistency.level", "ONE")
spark.conf.set("spark.cassandra.query.retryCount", "5")
spark.conf.set("spark.cassandra.query.retryIntervalMs", "2000")

Connection Pool and Network Limits

"Too many open connections" errors occur when executor parallelism exceeds Cassandra’s native_transport_max_threads. Each executor opens a pool of driver connections, and high parallelism can exhaust the cluster’s connection budget.

Reduce connection lifetime or increase Cassandra server limits:

spark.conf.set("spark.cassandra.connection.keep_alive_ms", "5000")

Version Compatibility and Driver Mismatches

ClassNotFoundException or NoSuchMethodError typically indicate version skew between Spark, the connector, and the DataStax Java driver. The connector bundles a specific driver version that must match your Cassandra cluster version.

Verify compatibility using the official matrix and upgrade the Java driver when running Cassandra 4.x+.

Optimized Configuration Examples for Large Datasets

Optimised Read for a 500 GB Table

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("CassandraReadTune")
  .config("spark.executor.memory", "8g")
  .config("spark.executor.cores", "4")
  .config("spark.cassandra.input.split.size_in_mb", "16")
  .config("spark.cassandra.input.fetch.size_in_rows", "5000")
  .getOrCreate()

val df = spark.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("keyspace" -> "analytics", "table" -> "events"))
  .load()

df.filter($"event_type" === "click")
  .groupBy($"user_id")
  .count()
  .show()

Bulk Write with Controlled Batching

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder()
  .appName("CassandraBulkWrite")
  .config("spark.cassandra.output.batch.size_rows", "100")
  .config("spark.cassandra.output.concurrent.writes", "20")
  .config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM")
  .getOrCreate()

largeDf.write
  .format("org.apache.spark.sql.cassandra")
  .options(Map("keyspace" -> "analytics", "table" -> "events"))
  .mode(SaveMode.Append)
  .save()

Tuning Consistency and Retries

spark.conf.set("spark.cassandra.query.retryCount", "5")
spark.conf.set("spark.cassandra.query.retryIntervalMs", "2000")
spark.conf.set("spark.cassandra.output.consistency.level", "ONE")
spark.conf.set("spark.cassandra.input.consistency.level", "LOCAL_QUORUM")

Summary

  • Partitioning bottlenecks occur when spark.cassandra.input.split.size_in_mb is too large; reduce this value or use spark.cassandra.input.split.count to match executor cores.
  • Memory pressure arises from excessive fetch sizes; limit spark.cassandra.input.fetch.size_in_rows to a few thousand rows per page.
  • Coordinator overload during writes results from large batch sizes; keep spark.cassandra.output.batch.size_rows under 200 and increase spark.cassandra.output.concurrent.writes.
  • Consistency levels set to ALL cause high latency and failure rates on large jobs; prefer ONE or QUORUM for analytical workloads.
  • Connection limits require tuning spark.cassandra.connection.keep_alive_ms or increasing Cassandra’s native_transport_max_threads to prevent "too many connections" errors.

Frequently Asked Questions

Why does my Spark job fail with OutOfMemoryError when reading from Cassandra?

This occurs when the connector fetches too many rows per page, causing executors to materialize massive result sets in memory. Reduce spark.cassandra.input.fetch.size_in_rows to 5000 or lower, and ensure spark.cassandra.input.page.row.size_in_mb is set appropriately for your executor heap size.

How do I prevent "Write timeout" exceptions during bulk loads to Cassandra?

Write timeouts indicate coordinator hotspotting or driver queue saturation. Decrease spark.cassandra.output.batch.size_rows to 100–200 rows per batch, and increase spark.cassandra.output.concurrent.writes to 20 or higher to allow more parallel in-flight requests. For massive bulk loads, enable the bulk write grouping key option.

What causes "Too many open connections" errors in the Spark Cassandra connector?

Each executor maintains a pool of connections to Cassandra nodes. When executor parallelism is high, the total connection count can exceed Cassandra’s native_transport_max_threads limit. Reduce connection lifetime by lowering spark.cassandra.connection.keep_alive_ms, or increase the server-side thread limit in Cassandra’s cassandra.yaml.

Which consistency level should I use for large-scale analytical workloads?

For read-heavy analytical jobs processing billions of rows, use ONE or LOCAL_QUORUM for reads to minimize latency and avoid retry storms. For writes, use ONE or QUORUM rather than ALL, as ALL requires acknowledgment from every replica and causes high failure rates under network variability or node stress.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →