How to Use Spark JDBC to Read and Write Database Data in PySpark
Spark JDBC enables PySpark to treat relational database tables as DataFrames by forwarding Python API calls to the JVM-based JdbcRelationProvider, which handles connection management, dialect-specific SQL generation, and partitioned reads or batched writes.
The apache/spark repository provides a robust JDBC data source that bridges PySpark and relational databases. Using Spark JDBC, you can read entire tables or parallelized partitions into DataFrames and write distributed data back to SQL tables with configurable batching and transaction isolation.
Reading Data with Spark JDBC
The Python entry point for JDBC reads is DataFrameReader.jdbc in python/pyspark/sql/readwriter.py (lines 1062‑1182). This method builds a Java Properties object from the supplied dictionary and delegates to the JVM reader, which routes to JdbcRelationProvider.createRelation for reading (lines 30‑44 in JdbcRelationProvider.scala). The Scala implementation creates a JDBCRelation, fetches the schema via getSchema, and optionally partitions the scan using columnPartition.
Basic Table Reads
For a non-partitioned read of an entire table, supply the JDBC URL, table name, and connection properties:
url = "jdbc:derby:memory:mydb;create=true"
table = "people"
df = spark.read.jdbc(
url=url,
table=table,
properties={"user": "APP", "password": "secret"}
)
df.show()
Parallel Reads with Column-Range Partitioning
To distribute the read across multiple Spark tasks, specify a numeric partition column along with bounds and partition count. This triggers the column‑range path in the reader:
df = spark.read.jdbc(
url=url,
table=table,
column="id", # Partitioning column
lowerBound=1,
upperBound=10000,
numPartitions=10,
properties={"user": "APP", "password": "secret"}
)
The column, lowerBound, upperBound, and numPartitions parameters map to JDBCRelation.columnPartition, which generates WHERE clauses like id BETWEEN 1 AND 1000 for each partition.
Parallel Reads with Predicate Lists
Alternatively, provide an explicit list of predicates. Each predicate becomes a separate Spark partition, allowing fine-grained control over the query plan:
predicates = [
"id BETWEEN 1 AND 1000",
"id BETWEEN 1001 AND 2000",
"id BETWEEN 2001 AND 3000"
]
df = spark.read.jdbc(
url=url,
table=table,
predicates=predicates,
properties={"user": "APP", "password": "secret"}
)
This path is handled in the if predicates is not None block of DataFrameReader.jdbc, bypassing the column‑range logic.
Writing Data with Spark JDBC
Writes originate in DataFrameWriter.jdbc (python/pyspark/sql/readwriter.py, lines 2290‑2348). The Python wrapper constructs a Properties object and invokes the JVM writer, which routes to JdbcRelationProvider.createRelation for writing (lines 52‑99). The Scala implementation checks table existence, applies the specified SaveMode, and delegates to saveTable for batch insertion.
Basic Table Writes
Use DataFrameWriter.jdbc with a mode parameter to control write behavior:
df_to_write = spark.createDataFrame(
[(1, "Alice"), (2, "Bob")],
["id", "name"]
)
df_to_write.write.jdbc(
url=url,
table="people_copy",
mode="overwrite", # Options: append, overwrite, ignore, error
properties={"user": "APP", "password": "secret"}
)
The overwrite mode triggers a DROP TABLE followed by CREATE TABLE before insertion, while append inserts into the existing table.
Configuring Batch Size and Isolation Levels
Tune performance and transaction semantics via the properties dictionary. The batchsize option controls the number of rows per JDBC batch, and isolationLevel sets the transaction isolation:
df_to_write.write.jdbc(
url=url,
table="people_batch",
mode="append",
properties={
"user": "APP",
"password": "secret",
"batchsize": "5000", # Rows per batch
"isolationLevel": "READ_COMMITTED"
}
)
These options map to JdbcOptionsInWrite in the Scala layer, influencing how saveTable constructs PreparedStatement batches.
Key Implementation Details
Understanding the underlying architecture helps optimize Spark JDBC workflows.
Dialect Handling
JdbcDialects.get(url) selects a dialect-specific implementation (e.g., PostgresDialect, MySQLDialect) from the registry in sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala. Dialects handle identifier quoting, type mapping, and connection URL construction.
Connection Management
JdbcDialects.createConnectionFactory builds java.sql.Connection instances that are scoped to a single read or write operation and closed immediately afterward. Connection pooling is not built-in; use external pool managers if reuse is required.
Performance Knobs
fetchsize: Controls JDBC fetch size for reads (affects memory pressure on the driver).partitionColumn,lowerBound,upperBound,numPartitions: Drive parallel read partitioning.batchsize: Tunes write throughput via JDBC batching.truncate: Whentrue,overwritemode truncates rather than drops/recreates tables (preserves indexes).isolationLevel: Controls transaction isolation for writes.
Summary
- Spark JDBC bridges PySpark and relational databases via the
DataFrameReaderandDataFrameWriterAPIs, delegating to the JVMJdbcRelationProvider. - Reads support full table scans, column-range partitioning (
column,lowerBound,upperBound,numPartitions), and predicate-based partitioning for parallel execution. - Writes respect
SaveMode(append,overwrite,ignore,error) and support batching via thebatchsizeproperty. - Performance tuning relies on dialect-aware type mapping, configurable fetch sizes, partition columns, and transaction isolation levels implemented in
JdbcDialectsandJdbcOptionsInWrite.
Frequently Asked Questions
What is the difference between column-range partitioning and predicate-based partitioning in Spark JDBC?
Column-range partitioning splits the table based on a numeric column's value range, automatically generating BETWEEN clauses for each Spark partition using lowerBound, upperBound, and numPartitions. Predicate-based partitioning requires you to supply explicit WHERE clauses as a list; each predicate becomes its own partition. Use column-range for uniform data distribution and predicate lists when you need custom filtering logic or non-uniform splits.
How does Spark JDBC handle database dialects like PostgreSQL or MySQL?
Spark uses the JdbcDialects registry (sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala) to select a dialect implementation based on the JDBC URL. Dialects such as PostgresDialect or MySQLDialect define how to quote identifiers, map Spark SQL types to JDBC types, and construct connection URLs. This abstraction ensures that generated SQL and type conversions are compatible with the target database.
Can I control the transaction isolation level when writing via Spark JDBC?
Yes. Pass the isolationLevel property in the connection properties dictionary when calling DataFrameWriter.jdbc. Valid values include READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, and SERIALIZABLE. This setting maps to JdbcOptionsInWrite in the Scala layer and determines the transaction isolation level for the JDBC connection during the write operation.
What file paths contain the core Spark JDBC implementation?
The Python API layer resides in python/pyspark/sql/readwriter.py, containing DataFrameReader.jdbc (lines 1062‑1182) and DataFrameWriter.jdbc (lines 2290‑2348). The JVM implementation is in sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala, which handles both read (lines 30‑44) and write (lines 52‑99) logic. Dialect support is implemented in sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala, and utility functions live in sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →