Skip to content

SQLTableDataset

SQLTableDataset loads data from a SQL table and saves a pandas DataFrame to a table.

kedro_datasets.pandas.SQLTableDataset

SQLTableDataset(
    *,
    table_name,
    credentials,
    load_args=None,
    save_args=None,
    metadata=None
)

Bases: AbstractDataset[DataFrame, DataFrame]

SQLTableDataset loads data from a SQL table and saves a pandas dataframe to a table. It uses pandas.DataFrame internally, so it supports all allowed pandas options on read_sql_table and to_sql methods. Since Pandas uses SQLAlchemy behind the scenes, when instantiating SQLTableDataset one needs to pass a compatible connection string either in credentials (see the example code snippet below) or in load_args and save_args. Connection string formats supported by SQLAlchemy can be found here: https://docs.sqlalchemy.org/core/engines.html#database-urls

SQLTableDataset modifies the save parameters and stores the data with no index. This is designed to make load and save methods symmetric.

Examples:

Using the YAML API:

shuttles_table_dataset:
  type: pandas.SQLTableDataset
  credentials: db_credentials
  table_name: shuttles
  load_args:
    schema: dwschema
  save_args:
    schema: dwschema
    if_exists: replace

Sample database credentials entry in credentials.yml:

db_credentials:
  con: postgresql://scott:tiger@localhost/test
  pool_size: 10 # additional parameters

Using the Python API:

>>> import pandas as pd
>>> from kedro_datasets.pandas import SQLTableDataset
>>>
>>> data = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>>
>>> table_name = "table_a"
>>> credentials = {"con": f"sqlite:///{tmp_path / 'test.db'}"}
>>> dataset = SQLTableDataset(table_name=table_name, credentials=credentials)
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.equals(reloaded)

Parameters:

  • table_name (str) –

    The table name to load or save data to. It overwrites name in save_args and table_name parameters in load_args.

  • credentials (dict[str, Any]) –

    A dictionary with a SQLAlchemy connection string. Users are supposed to provide the connection string 'con' through credentials. To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls Additional parameters for the sqlalchemy engine can be provided alongside the 'con' parameter.

  • load_args (dict[str, Any] | None, default: None ) –

    Provided to underlying pandas read_sql_table function along with the connection string. To find all supported arguments, see here: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql_table.html To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls

  • save_args (dict[str, Any] | None, default: None ) –

    Provided to underlying pandas to_sql function along with the connection string. To find all supported arguments, see here: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_sql.html To find all supported connection string formats, see here: https://docs.sqlalchemy.org/core/engines.html#database-urls It has index=False in the default parameters.

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:

Source code in kedro_datasets/pandas/sql_dataset.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def __init__(  # noqa: PLR0913
    self,
    *,
    table_name: str,
    credentials: dict[str, Any],
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new ``SQLTableDataset``.

    Args:
        table_name: The table name to load or save data to. It
            overwrites name in ``save_args`` and ``table_name``
            parameters in ``load_args``.
        credentials: A dictionary with a ``SQLAlchemy`` connection string.
            Users are supposed to provide the connection string 'con'
            through credentials.
            To find all supported connection string formats, see here:
            https://docs.sqlalchemy.org/core/engines.html#database-urls
            Additional parameters for the sqlalchemy engine can be provided
            alongside the 'con' parameter.
        load_args: Provided to underlying pandas ``read_sql_table``
            function along with the connection string.
            To find all supported arguments, see here:
            https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql_table.html
            To find all supported connection string formats, see here:
            https://docs.sqlalchemy.org/core/engines.html#database-urls
        save_args: Provided to underlying pandas ``to_sql`` function along
            with the connection string.
            To find all supported arguments, see here:
            https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_sql.html
            To find all supported connection string formats, see here:
            https://docs.sqlalchemy.org/core/engines.html#database-urls
            It has ``index=False`` in the default parameters.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.

    Raises:
        DatasetError: When either ``table_name`` or ``con`` is empty.
    """

    if not table_name:
        raise DatasetError("'table_name' argument cannot be empty.")

    if not (credentials and "con" in credentials and credentials["con"]):
        raise DatasetError(
            "'con' argument cannot be empty. Please "
            "provide a SQLAlchemy connection string."
        )

    # Handle default load and save arguments
    self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
    self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}

    self._load_args["table_name"] = table_name
    self._save_args["name"] = table_name

    self._connection_str = credentials["con"]
    self._connection_args = {
        k: credentials[k] for k in credentials.keys() if k != "con"
    }

    self.metadata = metadata

DEFAULT_LOAD_ARGS class-attribute instance-attribute

DEFAULT_LOAD_ARGS = {}

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {'index': False}

_connection_args instance-attribute

_connection_args = {
    k: (credentials[k]) for k in (keys()) if k != "con"
}

_connection_str instance-attribute

_connection_str = credentials['con']

_load_args instance-attribute

_load_args = {
    None: DEFAULT_LOAD_ARGS,
    None: load_args or {},
}

_save_args instance-attribute

_save_args = {
    None: DEFAULT_SAVE_ARGS,
    None: save_args or {},
}

engine property

engine

The Engine object for the dataset's connection string.

engines class-attribute instance-attribute

engines = {}

metadata instance-attribute

metadata = metadata

_describe

_describe()
Source code in kedro_datasets/pandas/sql_dataset.py
247
248
249
250
251
252
253
254
255
256
def _describe(self) -> dict[str, Any]:
    load_args = copy.deepcopy(self._load_args)
    save_args = copy.deepcopy(self._save_args)
    del load_args["table_name"]
    del save_args["name"]
    return {
        "table_name": self._load_args["table_name"],
        "load_args": load_args,
        "save_args": save_args,
    }

_exists

_exists()
Source code in kedro_datasets/pandas/sql_dataset.py
264
265
266
267
def _exists(self) -> bool:
    insp = inspect(self.engine)
    schema = self._load_args.get("schema", None)
    return insp.has_table(self._load_args["table_name"], schema)

create_connection classmethod

create_connection(connection_str, connection_args=None)

Given a connection string, create singleton connection to be used across all instances of SQLTableDataset that need to connect to the same source.

Source code in kedro_datasets/pandas/sql_dataset.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
@classmethod
def create_connection(
    cls, connection_str: str, connection_args: dict | None = None
) -> None:
    """Given a connection string, create singleton connection
    to be used across all instances of ``SQLTableDataset`` that
    need to connect to the same source.
    """
    connection_args = connection_args or {}
    try:
        engine = create_engine(connection_str, **connection_args)
    except ImportError as import_error:
        raise _get_missing_module_error(import_error) from import_error
    except NoSuchModuleError as exc:
        raise _get_sql_alchemy_missing_error() from exc

    cls.engines[connection_str] = engine

load

load()
Source code in kedro_datasets/pandas/sql_dataset.py
258
259
def load(self) -> pd.DataFrame:
    return pd.read_sql_table(con=self.engine, **self._load_args)

preview

preview(nrows=5)

Generate a preview of the dataset with a specified number of rows.

Parameters:

  • nrows (int, default: 5 ) –

    The number of rows to include in the preview. Defaults to 5.

Returns:

  • dict

    A dictionary containing the data in a split format.

Source code in kedro_datasets/pandas/sql_dataset.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def preview(self, nrows: int = 5) -> TablePreview:
    """
    Generate a preview of the dataset with a specified number of rows.

    Args:
        nrows: The number of rows to include in the preview. Defaults to 5.

    Returns:
        dict: A dictionary containing the data in a split format.
    """

    table_name = self._load_args["table_name"]

    metadata = MetaData()
    table_ref = Table(table_name, metadata, autoload_with=self.engine)

    query = select(table_ref).limit(nrows)  # type: ignore[arg-type]

    with self.engine.connect() as conn:
        result = conn.execute(query)
        data_preview = pd.DataFrame(result.fetchall(), columns=result.keys())

    preview_data = data_preview.to_dict(orient="split")
    return preview_data

save

save(data)
Source code in kedro_datasets/pandas/sql_dataset.py
261
262
def save(self, data: pd.DataFrame) -> None:
    data.to_sql(con=self.engine, **self._save_args)