Skip to content

SparkJDBCDataset

SparkJDBCDataset loads and saves data to JDBC sources using Apache Spark.

kedro_datasets.spark.SparkJDBCDataset

SparkJDBCDataset(
    *,
    url,
    table,
    credentials=None,
    load_args=None,
    save_args=None,
    metadata=None
)

Bases: AbstractDataset[DataFrame, DataFrame]

SparkJDBCDataset loads data from a database table accessible via JDBC URL url and connection properties and saves the content of a PySpark DataFrame to an external database table via JDBC. It uses pyspark.sql.DataFrameReader and pyspark.sql.DataFrameWriter internally, so it supports all allowed PySpark options on jdbc.

Examples:

Using the YAML API:

weather:
  type: spark.SparkJDBCDataset
  table: weather_table
  url: jdbc:postgresql://localhost/test
  credentials: db_credentials
  load_args:
    properties:
      driver: org.postgresql.Driver
  save_args:
    properties:
      driver: org.postgresql.Driver

Using the Python API:

>>> import pandas as pd
>>> from kedro_datasets.spark import SparkJDBCDataset
>>> from pyspark.sql import SparkSession
>>>
>>> spark = SparkSession.builder.getOrCreate()
>>> data = spark.createDataFrame(
...     pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
... )
>>>
>>> url = "jdbc:postgresql://localhost/test"
>>> table = "table_a"
>>> connection_properties = {"driver": "org.postgresql.Driver"}
>>> dataset = SparkJDBCDataset(
...     url=url,
...     table=table,
...     credentials={"user": "scott", "password": "tiger"},
...     load_args={"properties": connection_properties},
...     save_args={"properties": connection_properties},
... )
>>>
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.toPandas().equals(reloaded.toPandas())

Parameters:

  • url (str) –

    A JDBC URL of the form jdbc:subprotocol:subname.

  • table (str) –

    The name of the table to load or save data to.

  • credentials (dict[str, Any] | None, default: None ) –

    A dictionary of JDBC database connection arguments. Normally at least properties user and password with their corresponding values. It updates properties parameter in load_args and save_args in case it is provided.

  • load_args (dict[str, Any] | None, default: None ) –

    Provided to underlying PySpark jdbc function along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.html

  • save_args (dict[str, Any] | None, default: None ) –

    Provided to underlying PySpark jdbc function along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.html

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:

  • DatasetError

    When either url or table is empty or when a property is provided with a None value.

Source code in kedro_datasets/spark/spark_jdbc_dataset.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def __init__(  # noqa: PLR0913
    self,
    *,
    url: str,
    table: str,
    credentials: dict[str, Any] | None = None,
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new ``SparkJDBCDataset``.

    Args:
        url: A JDBC URL of the form ``jdbc:subprotocol:subname``.
        table: The name of the table to load or save data to.
        credentials: A dictionary of JDBC database connection arguments.
            Normally at least properties ``user`` and ``password`` with
            their corresponding values.  It updates ``properties``
            parameter in ``load_args`` and ``save_args`` in case it is
            provided.
        load_args: Provided to underlying PySpark ``jdbc`` function along
            with the JDBC URL and the name of the table. To find all
            supported arguments, see here:
            https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.html
        save_args: Provided to underlying PySpark ``jdbc`` function along
            with the JDBC URL and the name of the table. To find all
            supported arguments, see here:
            https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.html
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.

    Raises:
        DatasetError: When either ``url`` or ``table`` is empty or
            when a property is provided with a None value.
    """

    if not url:
        raise DatasetError(
            "'url' argument cannot be empty. Please "
            "provide a JDBC URL of the form "
            "'jdbc:subprotocol:subname'."
        )

    if not table:
        raise DatasetError(
            "'table' argument cannot be empty. Please "
            "provide the name of the table to load or save "
            "data to."
        )

    self._url = url
    self._table = table

    self.metadata = metadata

    # Handle default load and save arguments
    self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
    self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}

    # Update properties in load_args and save_args with credentials.
    if credentials is not None:
        # Check credentials for bad inputs.
        for cred_key, cred_value in credentials.items():
            if cred_value is None:
                raise DatasetError(
                    f"Credential property '{cred_key}' cannot be None. "
                    f"Please provide a value."
                )

        load_properties = self._load_args.get("properties", {})
        save_properties = self._save_args.get("properties", {})
        self._load_args["properties"] = {**load_properties, **credentials}
        self._save_args["properties"] = {**save_properties, **credentials}

DEFAULT_LOAD_ARGS class-attribute instance-attribute

DEFAULT_LOAD_ARGS = {}

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {}

_load_args instance-attribute

_load_args = {
    None: DEFAULT_LOAD_ARGS,
    None: load_args or {},
}

_save_args instance-attribute

_save_args = {
    None: DEFAULT_SAVE_ARGS,
    None: save_args or {},
}

_table instance-attribute

_table = table

_url instance-attribute

_url = url

metadata instance-attribute

metadata = metadata

_describe

_describe()
Source code in kedro_datasets/spark/spark_jdbc_dataset.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def _describe(self) -> dict[str, Any]:
    load_args = self._load_args
    save_args = self._save_args

    # Remove user and password values from load and save properties.
    if "properties" in load_args:
        load_properties = load_args["properties"].copy()
        load_properties.pop("user", None)
        load_properties.pop("password", None)
        load_args = {**load_args, "properties": load_properties}
    if "properties" in save_args:
        save_properties = save_args["properties"].copy()
        save_properties.pop("user", None)
        save_properties.pop("password", None)
        save_args = {**save_args, "properties": save_properties}

    return {
        "url": self._url,
        "table": self._table,
        "load_args": load_args,
        "save_args": save_args,
    }

load

load()
Source code in kedro_datasets/spark/spark_jdbc_dataset.py
164
165
def load(self) -> DataFrame:
    return get_spark().read.jdbc(self._url, self._table, **self._load_args)

save

save(data)
Source code in kedro_datasets/spark/spark_jdbc_dataset.py
167
168
def save(self, data: DataFrame) -> None:
    return data.write.jdbc(self._url, self._table, **self._save_args)