kedro_datasets.spark.SparkJDBCDataset¶
- class kedro_datasets.spark.SparkJDBCDataset(*, url, table, credentials=None, load_args=None, save_args=None, metadata=None)[source]¶
SparkJDBCDatasetloads data from a database table accessible via JDBC URL url and connection properties and saves the content of a PySpark DataFrame to an external database table via JDBC. It usespyspark.sql.DataFrameReaderandpyspark.sql.DataFrameWriterinternally, so it supports all allowed PySpark options onjdbc.Example usage for the YAML API:
weather: type: spark.SparkJDBCDataset table: weather_table url: jdbc:postgresql://localhost/test credentials: db_credentials load_args: properties: driver: org.postgresql.Driver save_args: properties: driver: org.postgresql.Driver
Example usage for the Python API:
import pandas as pd from kedro_datasets.spark import SparkJDBCDataset from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() data = spark.createDataFrame( ... pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) ... ) url = "jdbc:postgresql://localhost/test" table = "table_a" connection_properties = {"driver": "org.postgresql.Driver"} dataset = SparkJDBCDataset( ... url=url, ... table=table, ... credentials={"user": "scott", "password": "tiger"}, ... load_args={"properties": connection_properties}, ... save_args={"properties": connection_properties}, ... ) dataset.save(data) reloaded = dataset.load() assert data.toPandas().equals(reloaded.toPandas())
Attributes
Methods
exists()Checks whether a data set's output already exists by calling the provided _exists() method.
from_config(name, config[, load_version, ...])Create a data set instance using the configuration provided.
load()Loads data by delegation to the provided load method.
release()Release any cached data.
save(data)Saves data by delegation to the provided save method.
- __init__(*, url, table, credentials=None, load_args=None, save_args=None, metadata=None)[source]¶
Creates a new
SparkJDBCDataset.- Parameters:
url (
str) – A JDBC URL of the formjdbc:subprotocol:subname.table (
str) – The name of the table to load or save data to.credentials (
Optional[dict[str,Any]]) – A dictionary of JDBC database connection arguments. Normally at least propertiesuserandpasswordwith their corresponding values. It updatespropertiesparameter inload_argsandsave_argsin case it is provided.load_args (
Optional[dict[str,Any]]) – Provided to underlying PySparkjdbcfunction along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.htmlsave_args (
Optional[dict[str,Any]]) – Provided to underlying PySparkjdbcfunction along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.htmlmetadata (
Optional[dict[str,Any]]) – Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.
- Raises:
DatasetError – When either
urlortableis empty or when a property is provided with a None value.
- exists()¶
Checks whether a data set’s output already exists by calling the provided _exists() method.
- Return type:
- Returns:
Flag indicating whether the output already exists.
- Raises:
DatasetError – when underlying exists method raises error.
- classmethod from_config(name, config, load_version=None, save_version=None)¶
Create a data set instance using the configuration provided.
- Parameters:
name (str) – Data set name.
load_version (str | None) – Version string to be used for
loadoperation if the data set is versioned. Has no effect on the data set if versioning was not enabled.save_version (str | None) – Version string to be used for
saveoperation if the data set is versioned. Has no effect on the data set if versioning was not enabled.
- Return type:
AbstractDataset
- Returns:
An instance of an
AbstractDatasetsubclass.- Raises:
DatasetError – When the function fails to create the data set from its config.
- load()¶
Loads data by delegation to the provided load method.
- Return type:
TypeVar(_DO)- Returns:
Data returned by the provided load method.
- Raises:
DatasetError – When underlying load method raises error.
- release()¶
Release any cached data.
- Raises:
DatasetError – when underlying release method raises error.
- Return type:
- save(data)¶
Saves data by delegation to the provided save method.
- Parameters:
data (
TypeVar(_DI)) – the value to be saved by provided save method.- Raises:
DatasetError – when underlying save method raises error.
FileNotFoundError – when save method got file instead of dir, on Windows.
NotADirectoryError – when save method got file instead of dir, on Unix.
- Return type: