Common Issues When Configuring the Spark Cassandra Connector for Large Datasets
Developers typically encounter partitioning bottlenecks, memory pressure from oversized fetch requests, and coordinator overload during writes when scaling the Spark Cassandra connector to billions of rows, all of which can be resolved through specific split size, batch size, and consistency level configurations.
The Spark Cassandra connector, maintained by DataStax and widely used with Apache Spark, enables seamless integration between Spark DataFrames and Cassandra tables. When processing large datasets spanning millions or billions of rows, improper configuration of this connector leads to out-of-memory errors, timeout exceptions, and severe underutilization of cluster resources.
Understanding the Connector Architecture
The connector operates as a native Spark data source, translating Spark operations into Cassandra-specific requests. In CassandraSparkContext.scala, the connector provides the entry point for RDD-level access, while CassandraSQLContext.scala handles DataFrame API integration through the org.apache.spark.sql.cassandra format.
When reading data, the connector creates CassandraInputPartition objects based on Cassandra token ranges. The DefaultConnectionFactory.scala manages the underlying DataStax Java driver connections, pooling sessions and respecting keep-alive settings. For writes, CassandraWriteConf.scala encapsulates batch sizing, concurrency, and consistency configurations that directly impact coordinator load.
Partitioning and Parallelism Bottlenecks
The most common performance issue stems from insufficient Spark partitions. By default, the connector generates one RDD partition per Cassandra token range or based on a split size of 64 MB. On large datasets with small node counts, this creates too few partitions to utilize all available executor cores.
To resolve this, reduce spark.cassandra.input.split.size_in_mb or switch to row-based splitting with spark.cassandra.input.split.size_in_rows. Alternatively, explicitly set spark.cassandra.input.split.count to a multiple of your total executor cores.
spark.conf.set("spark.cassandra.input.split.size_in_mb", "16")
spark.conf.set("spark.cassandra.input.split.count", "200")
Memory Pressure and Fetch Size Tuning
Out-of-memory errors on executors typically occur when the driver attempts to materialize too many rows simultaneously. The connector fetches data in pages, and if spark.cassandra.input.fetch.size_in_rows is too high, a single task can overwhelm the executor heap.
Tune the fetch size to a few thousand rows and adjust the page size threshold:
spark.conf.set("spark.cassandra.input.fetch.size_in_rows", "5000")
spark.conf.set("spark.cassandra.input.page.row.size_in_mb", "5")
Write Batching and Coordinator Overload
Write throughput stalls and "Write timeout" exceptions indicate coordinator hotspotting. Large batches sent via spark.cassandra.output.batch.size_rows create single points of pressure on Cassandra coordinators, while insufficient concurrency leaves the driver queue idle.
Keep batch sizes modest (100–200 rows) and increase parallel writes:
spark.conf.set("spark.cassandra.output.batch.size_rows", "100")
spark.conf.set("spark.cassandra.output.concurrent.writes", "20")
For bulk loads, use the Bulk Write mode via spark.cassandra.output.batch.groupingKey to group writes by partition key, reducing coordinator hops.
Consistency Level and Retry Configuration
Setting consistency to ALL on large write-heavy jobs introduces high latency and failure rates. Each write must be acknowledged by every replica, so network hiccups trigger expensive retries.
Use ONE or QUORUM for analytical loads, and configure retry policies:
spark.conf.set("spark.cassandra.output.consistency.level", "ONE")
spark.conf.set("spark.cassandra.query.retryCount", "5")
spark.conf.set("spark.cassandra.query.retryIntervalMs", "2000")
Connection Pool and Network Limits
"Too many open connections" errors occur when executor parallelism exceeds Cassandra’s native_transport_max_threads. Each executor opens a pool of driver connections, and high parallelism can exhaust the cluster’s connection budget.
Reduce connection lifetime or increase Cassandra server limits:
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "5000")
Version Compatibility and Driver Mismatches
ClassNotFoundException or NoSuchMethodError typically indicate version skew between Spark, the connector, and the DataStax Java driver. The connector bundles a specific driver version that must match your Cassandra cluster version.
Verify compatibility using the official matrix and upgrade the Java driver when running Cassandra 4.x+.
Optimized Configuration Examples for Large Datasets
Optimised Read for a 500 GB Table
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("CassandraReadTune")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", "4")
.config("spark.cassandra.input.split.size_in_mb", "16")
.config("spark.cassandra.input.fetch.size_in_rows", "5000")
.getOrCreate()
val df = spark.read
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "analytics", "table" -> "events"))
.load()
df.filter($"event_type" === "click")
.groupBy($"user_id")
.count()
.show()
Bulk Write with Controlled Batching
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder()
.appName("CassandraBulkWrite")
.config("spark.cassandra.output.batch.size_rows", "100")
.config("spark.cassandra.output.concurrent.writes", "20")
.config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM")
.getOrCreate()
largeDf.write
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "analytics", "table" -> "events"))
.mode(SaveMode.Append)
.save()
Tuning Consistency and Retries
spark.conf.set("spark.cassandra.query.retryCount", "5")
spark.conf.set("spark.cassandra.query.retryIntervalMs", "2000")
spark.conf.set("spark.cassandra.output.consistency.level", "ONE")
spark.conf.set("spark.cassandra.input.consistency.level", "LOCAL_QUORUM")
Summary
- Partitioning bottlenecks occur when
spark.cassandra.input.split.size_in_mbis too large; reduce this value or usespark.cassandra.input.split.countto match executor cores. - Memory pressure arises from excessive fetch sizes; limit
spark.cassandra.input.fetch.size_in_rowsto a few thousand rows per page. - Coordinator overload during writes results from large batch sizes; keep
spark.cassandra.output.batch.size_rowsunder 200 and increasespark.cassandra.output.concurrent.writes. - Consistency levels set to
ALLcause high latency and failure rates on large jobs; preferONEorQUORUMfor analytical workloads. - Connection limits require tuning
spark.cassandra.connection.keep_alive_msor increasing Cassandra’snative_transport_max_threadsto prevent "too many connections" errors.
Frequently Asked Questions
Why does my Spark job fail with OutOfMemoryError when reading from Cassandra?
This occurs when the connector fetches too many rows per page, causing executors to materialize massive result sets in memory. Reduce spark.cassandra.input.fetch.size_in_rows to 5000 or lower, and ensure spark.cassandra.input.page.row.size_in_mb is set appropriately for your executor heap size.
How do I prevent "Write timeout" exceptions during bulk loads to Cassandra?
Write timeouts indicate coordinator hotspotting or driver queue saturation. Decrease spark.cassandra.output.batch.size_rows to 100–200 rows per batch, and increase spark.cassandra.output.concurrent.writes to 20 or higher to allow more parallel in-flight requests. For massive bulk loads, enable the bulk write grouping key option.
What causes "Too many open connections" errors in the Spark Cassandra connector?
Each executor maintains a pool of connections to Cassandra nodes. When executor parallelism is high, the total connection count can exceed Cassandra’s native_transport_max_threads limit. Reduce connection lifetime by lowering spark.cassandra.connection.keep_alive_ms, or increase the server-side thread limit in Cassandra’s cassandra.yaml.
Which consistency level should I use for large-scale analytical workloads?
For read-heavy analytical jobs processing billions of rows, use ONE or LOCAL_QUORUM for reads to minimize latency and avoid retry storms. For writes, use ONE or QUORUM rather than ALL, as ALL requires acknowledgment from every replica and causes high failure rates under network variability or node stress.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →